Paho_MQTT_C_Library库下几个比较重要的函数:
MQTTPacket.c1
int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffers, size_t* buflens, int* frees);
SSLScoket.c1
int SSLSocket_putdatas(SSL* ssl, int socket, char* buf0, size_t buf0len, int count, char** buffers, size_t* buflens, int* frees)
SSLScoket.c1
int SSLSocket_putdatas(SSL* ssl, int socket, char* buf0, size_t buf0len, int count, char** buffers, size_t* buflens, int* frees)
移植过程
首先每个工程里都包含MQTTClient_create函数,若定义了OPENSSL宏定义,则编译相对应的SSL部分函数以及变量。1
int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId, int persistence_type, void* persistence_context)
例如在MQTTClient.c里,1
2
3
4
5
6
7
8
9#if defined(OPENSSL)
SSLSocket_initialize(); //初始化SSLSocket
#endif
#if defined(OPENSSL)
serverURI += strlen(URI_SSL);//增加URI前导名"ssl://"
m->ssl = 1;
#else
rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
goto exit;
在MQTTPacket.c里,发送函数MQTTPacket_sends中也是如此,如果定义了OPENSSL则调用SSL的发送函数,否则不加SSL发送。1
2
3
4
5
6#if defined(OPENSSL)
if (net->ssl)
rc = SSLSocket_putdatas(net->ssl, net->socket, buf, buf0len, count, buffers, buflens, frees);
else
#endif
rc = Socket_putdatas(net->socket, buf, buf0len, count, buffers, buflens, frees);
这个结构里定义了MQTTClient_SSLOptions所需要的几个变量1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36typedef struct
{
/** The eyecatcher for this structure. Must be MQTS */
char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;
/** The file in PEM format containing the public digital certificates trusted by the client. */
const char* trustStore;
/** The file in PEM format containing the public certificate chain of the client. It may also include
* the client's private key.
*/
const char* keyStore;
/** If not included in the sslKeyStore, this setting points to the file in PEM format containing
* the client's private key.
*/
const char* privateKey;
/** The password to load the client's privateKey if encrypted. */
const char* privateKeyPassword;
/**
* The list of cipher suites that the client will present to the server during the SSL handshake. For a
* full explanation of the cipher list format, please see the OpenSSL on-line documentation:
* http://www.openssl.org/docs/apps/ciphers.html#CIPHER_LIST_FORMAT
* If this setting is ommitted, its default value will be "ALL", that is, all the cipher suites -excluding
* those offering no encryption- will be considered.
* This setting can be used to set an SSL anonymous connection ("aNULL" string value, for instance).
*/
const char* enabledCipherSuites;
/** True/False option to enable verification of the server certificate **/
int enableServerCertAuth;
} MQTTClient_SSLOptions;
- struct_id必须是MQTS字符
- struct_version必须是0
- trustStore是公钥,PEM格式
- keyStore是公钥密码?PEM格式
- privateKey是私钥,如果sslKeyStore没有
- privateKeyPassword是私钥的密码
- enabledCipherSuites是否启用匿名连接
- enableServerCertAuth是否启用服务器证书验证
再看几个结构体1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30typedef struct
{
char* clientID; /**< the string id of the client */
const char* username; /**< MQTT v3.1 user name */
int passwordlen; /**< MQTT password length */
const void* password; /**< MQTT v3.1 binary password */
unsigned int cleansession : 1; /**< MQTT clean session flag */
unsigned int connected : 1; /**< whether it is currently connected */
unsigned int good : 1; /**< if we have an error on the socket we turn this off */
unsigned int ping_outstanding : 1;
int connect_state : 4;
networkHandles net;
int msgID;
int keepAliveInterval;
int retryInterval;
int maxInflightMessages;
willMessages* will;
List* inboundMsgs;
List* outboundMsgs; /**< in flight */
List* messageQueue;
unsigned int qentry_seqno;
void* phandle; /* the persistence handle */
MQTTClient_persistence* persistence; /* a persistence implementation */
void* context; /* calling context - used when calling disconnect_internal */
int MQTTVersion;
#if defined(OPENSSL)
MQTTClient_SSLOptions *sslopts;
SSL_SESSION* session; /***< SSL session pointer for fast handhake */
#endif
} Clients;
其中包含的MQTTClient_SSLOptions结构体正是需要SSL连接时用到的证书类的结构体,这里结构体指针为sslopts。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20typedef struct
{
char* serverURI;
#if defined(OPENSSL)
int ssl;
#endif
Clients* c;
MQTTClient_connectionLost* cl;
MQTTClient_messageArrived* ma;
MQTTClient_deliveryComplete* dc;
void* context;
sem_type connect_sem;
int rc; /* getsockopt return code in connect */
sem_type connack_sem;
sem_type suback_sem;
sem_type unsuback_sem;
MQTTPacket* pack;
} MQTTClients;
static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions options, const char serverURI)
{
MQTTClients* m = handle;
START_TIME_TYPE start;
long millisecsTimeout = 30000L;
int rc = SOCKET_ERROR;
int MQTTVersion = 0;
FUNC_ENTRY;
millisecsTimeout = options->connectTimeout * 1000;
start = MQTTClient_start_clock();
m->c->keepAliveInterval = options->keepAliveInterval;
setRetryLoopInterval(options->keepAliveInterval);
m->c->cleansession = options->cleansession;
m->c->maxInflightMessages = (options->reliable) ? 1 : 10;
if (m->c->will)
{
free(m->c->will->payload);
free(m->c->will->topic);
free(m->c->will);
m->c->will = NULL;
}
if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
{
const void* source = NULL;
m->c->will = malloc(sizeof(willMessages));
if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
{
if (options->will->struct_version == 1 && options->will->payload.data)
{
m->c->will->payloadlen = options->will->payload.len;
source = options->will->payload.data;
}
else
{
m->c->will->payloadlen = strlen(options->will->message);
source = (void*)options->will->message;
}
m->c->will->payload = malloc(m->c->will->payloadlen);
memcpy(m->c->will->payload, source, m->c->will->payloadlen);
}
else
{
m->c->will->payload = NULL;
m->c->will->payloadlen = 0;
}
m->c->will->qos = options->will->qos;
m->c->will->retained = options->will->retained;
m->c->will->topic = MQTTStrdup(options->will->topicName);
}
#if defined(OPENSSL)
if (m->c->sslopts)
{
if (m->c->sslopts->trustStore)
free((void)m->c->sslopts->trustStore);
if (m->c->sslopts->keyStore)
free((void)m->c->sslopts->keyStore);
if (m->c->sslopts->privateKey)
free((void)m->c->sslopts->privateKey);
if (m->c->sslopts->privateKeyPassword)
free((void)m->c->sslopts->privateKeyPassword);
if (m->c->sslopts->enabledCipherSuites)
free((void*)m->c->sslopts->enabledCipherSuites);
free(m->c->sslopts);
m->c->sslopts = NULL;
}
if (options->struct_version != 0 && options->ssl)
{
m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions));
memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
if (options->ssl->trustStore)
m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
if (options->ssl->keyStore)
m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
if (options->ssl->privateKey)
m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
if (options->ssl->privateKeyPassword)
m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
if (options->ssl->enabledCipherSuites)
m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
}
#endif
m->c->username = options->username;
m->c->password = options->password;
if (options->password)
m->c->passwordlen = strlen(options->password);
else if (options->struct_version >= 5 && options->binarypwd.data)
{
m->c->password = options->binarypwd.data;
m->c->passwordlen = options->binarypwd.len;
}
m->c->retryInterval = options->retryInterval;
if (options->struct_version >= 3)
MQTTVersion = options->MQTTVersion;
else
MQTTVersion = MQTTVERSION_DEFAULT;
if (MQTTVersion == MQTTVERSION_DEFAULT)
{
if ((rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4, start, millisecsTimeout)) != MQTTCLIENT_SUCCESS)
rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout);
}
else
rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout);
FUNC_EXIT_RC(rc);
return rc;
}
该函数前面部分在导入需要的参数,关键是MQTTClient_connectURIVersion函数
static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions options, const char serverURI, int MQTTVersion,
START_TIME_TYPE start, long millisecsTimeout)
{
MQTTClients* m = handle;
int rc = SOCKET_ERROR;
int sessionPresent = 0;
FUNC_ENTRY;
if (m->ma && !running)
{
Thread_start(MQTTClient_run, handle);
if (MQTTClient_elapsed(start) >= millisecsTimeout)
{
rc = SOCKET_ERROR;
goto exit;
}
MQTTClient_sleep(100L);
}
Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, MQTTVersion);
#if defined(OPENSSL)
rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, MQTTVersion);
#else
rc = MQTTProtocol_connect(serverURI, m->c, MQTTVersion);
#endif
if (rc == SOCKET_ERROR)
goto exit;
if (m->c->connect_state == 0)
{
rc = SOCKET_ERROR;
goto exit;
}
if (m->c->connect_state == 1) /* TCP connect started - wait for completion */
{
Thread_unlock_mutex(mqttclient_mutex);
MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start));
Thread_lock_mutex(mqttclient_mutex);
if (rc != 0)
{
rc = SOCKET_ERROR;
goto exit;
}
#if defined(OPENSSL)
if (m->ssl)
{
int port;
char* hostname;
int setSocketForSSLrc = 0;
hostname = MQTTProtocol_addressPort(m->serverURI, &port);
setSocketForSSLrc = SSLSocket_setSocketForSSL(&m->c->net, m->c->sslopts, hostname);
if (hostname != m->serverURI)
free(hostname);
if (setSocketForSSLrc != MQTTCLIENT_SUCCESS)
{
if (m->c->session != NULL)
if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket);
if (rc == TCPSOCKET_INTERRUPTED)
m->c->connect_state = 2; /* the connect is still in progress */
else if (rc == SSL_FATAL)
{
rc = SOCKET_ERROR;
goto exit;
}
else if (rc == 1)
{
rc = MQTTCLIENT_SUCCESS;
m->c->connect_state = 3;
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
}
if (!m->c->cleansession && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
}
}
else
{
rc = SOCKET_ERROR;
goto exit;
}
}
else
{
#endif
m->c->connect_state = 3; / TCP connect completed, in which case send the MQTT connect packet /
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
}
#if defined(OPENSSL)
}
#endif
}
#if defined(OPENSSL)
if (m->c->connect_state == 2) / SSL connect sent - wait for completion /
{
Thread_unlock_mutex(mqttclient_mutex);
MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start));
Thread_lock_mutex(mqttclient_mutex);
if (rc != 1)
{
rc = SOCKET_ERROR;
goto exit;
}
if(!m->c->cleansession && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
m->c->connect_state = 3; / TCP connect completed, in which case send the MQTT connect packet /
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
}
}
#endif
if (m->c->connect_state == 3) /* MQTT connect sent - wait for CONNACK */
{
MQTTPacket* pack = NULL;
Thread_unlock_mutex(mqttclient_mutex);
pack = MQTTClient_waitfor(handle, CONNACK, &rc, millisecsTimeout - MQTTClient_elapsed(start));
Thread_lock_mutex(mqttclient_mutex);
if (pack == NULL)
rc = SOCKET_ERROR;
else
{
Connack* connack = (Connack*)pack;
Log(TRACE_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
if ((rc = connack->rc) == MQTTCLIENT_SUCCESS)
{
m->c->connected = 1;
m->c->good = 1;
m->c->connect_state = 0;
if (MQTTVersion == 4)
sessionPresent = connack->flags.bits.sessionPresent;
if (m->c->cleansession)
rc = MQTTClient_cleanSession(m->c);
if (m->c->outboundMsgs->count > 0)
{
ListElement* outcurrent = NULL;
while (ListNextElement(m->c->outboundMsgs, &outcurrent))
{
Messages* m = (Messages*)(outcurrent->content);
m->lastTouch = 0;
}
MQTTProtocol_retry((time_t)0, 1, 1);
if (m->c->connected != 1)
rc = MQTTCLIENT_DISCONNECTED;
}
}
free(connack);
m->pack = NULL;
}
}
exit:
if (rc == MQTTCLIENT_SUCCESS)
{
if (options->struct_version == 4) / means we have to fill out return values /
{
options->returned.serverURI = serverURI;
options->returned.MQTTVersion = MQTTVersion;
options->returned.sessionPresent = sessionPresent;
}
}
else
MQTTClient_disconnect1(handle, 0, 0, (MQTTVersion == 3)); / don’t want to call connection lost /
FUNC_EXIT_RC(rc);
return rc;
}
这个函数里主要是调用了SSLSocket_setSocketForSSL和SSLSocket_connect