Paho_MQTT_C

Paho_MQTT_C_Library库下几个比较重要的函数:
MQTTPacket.c

1
int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffers, size_t* buflens, int* frees);

SSLScoket.c

1
int SSLSocket_putdatas(SSL* ssl, int socket, char* buf0, size_t buf0len, int count, char** buffers, size_t* buflens, int* frees)

SSLScoket.c

1
int SSLSocket_putdatas(SSL* ssl, int socket, char* buf0, size_t buf0len, int count, char** buffers, size_t* buflens, int* frees)

移植过程

首先每个工程里都包含MQTTClient_create函数,若定义了OPENSSL宏定义,则编译相对应的SSL部分函数以及变量。

1
int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId, int persistence_type, void* persistence_context)

例如在MQTTClient.c里,

1
2
3
4
5
6
7
8
9
#if defined(OPENSSL)
SSLSocket_initialize(); //初始化SSLSocket
#endif
#if defined(OPENSSL)
serverURI += strlen(URI_SSL);//增加URI前导名"ssl://"
m->ssl = 1;
#else
rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
goto exit;

在MQTTPacket.c里,发送函数MQTTPacket_sends中也是如此,如果定义了OPENSSL则调用SSL的发送函数,否则不加SSL发送。

1
2
3
4
5
6
#if defined(OPENSSL)
if (net->ssl)
rc = SSLSocket_putdatas(net->ssl, net->socket, buf, buf0len, count, buffers, buflens, frees);
else
#endif
rc = Socket_putdatas(net->socket, buf, buf0len, count, buffers, buflens, frees);

这个结构里定义了MQTTClient_SSLOptions所需要的几个变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
typedef struct
{
/** The eyecatcher for this structure. Must be MQTS */
char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;

/** The file in PEM format containing the public digital certificates trusted by the client. */
const char* trustStore;

/** The file in PEM format containing the public certificate chain of the client. It may also include
* the client's private key.
*/
const char* keyStore;

/** If not included in the sslKeyStore, this setting points to the file in PEM format containing
* the client's private key.
*/
const char* privateKey;
/** The password to load the client's privateKey if encrypted. */
const char* privateKeyPassword;

/**
* The list of cipher suites that the client will present to the server during the SSL handshake. For a
* full explanation of the cipher list format, please see the OpenSSL on-line documentation:
* http://www.openssl.org/docs/apps/ciphers.html#CIPHER_LIST_FORMAT
* If this setting is ommitted, its default value will be "ALL", that is, all the cipher suites -excluding
* those offering no encryption- will be considered.
* This setting can be used to set an SSL anonymous connection ("aNULL" string value, for instance).
*/
const char* enabledCipherSuites;

/** True/False option to enable verification of the server certificate **/
int enableServerCertAuth;

} MQTTClient_SSLOptions;

  • struct_id必须是MQTS字符
  • struct_version必须是0
  • trustStore是公钥,PEM格式
  • keyStore是公钥密码?PEM格式
  • privateKey是私钥,如果sslKeyStore没有
  • privateKeyPassword是私钥的密码
  • enabledCipherSuites是否启用匿名连接
  • enableServerCertAuth是否启用服务器证书验证
    再看几个结构体
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    typedef struct
    {
    char* clientID; /**< the string id of the client */
    const char* username; /**< MQTT v3.1 user name */
    int passwordlen; /**< MQTT password length */
    const void* password; /**< MQTT v3.1 binary password */
    unsigned int cleansession : 1; /**< MQTT clean session flag */
    unsigned int connected : 1; /**< whether it is currently connected */
    unsigned int good : 1; /**< if we have an error on the socket we turn this off */
    unsigned int ping_outstanding : 1;
    int connect_state : 4;
    networkHandles net;
    int msgID;
    int keepAliveInterval;
    int retryInterval;
    int maxInflightMessages;
    willMessages* will;
    List* inboundMsgs;
    List* outboundMsgs; /**< in flight */
    List* messageQueue;
    unsigned int qentry_seqno;
    void* phandle; /* the persistence handle */
    MQTTClient_persistence* persistence; /* a persistence implementation */
    void* context; /* calling context - used when calling disconnect_internal */
    int MQTTVersion;
    #if defined(OPENSSL)
    MQTTClient_SSLOptions *sslopts;
    SSL_SESSION* session; /***< SSL session pointer for fast handhake */
    #endif
    } Clients;

其中包含的MQTTClient_SSLOptions结构体正是需要SSL连接时用到的证书类的结构体,这里结构体指针为sslopts。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
typedef struct
{
char* serverURI;
#if defined(OPENSSL)
int ssl;
#endif
Clients* c;
MQTTClient_connectionLost* cl;
MQTTClient_messageArrived* ma;
MQTTClient_deliveryComplete* dc;
void* context;

sem_type connect_sem;
int rc; /* getsockopt return code in connect */
sem_type connack_sem;
sem_type suback_sem;
sem_type unsuback_sem;
MQTTPacket* pack;

} MQTTClients;

static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions options, const char serverURI)
{
MQTTClients* m = handle;
START_TIME_TYPE start;
long millisecsTimeout = 30000L;
int rc = SOCKET_ERROR;
int MQTTVersion = 0;

FUNC_ENTRY;
millisecsTimeout = options->connectTimeout * 1000;
start = MQTTClient_start_clock();

m->c->keepAliveInterval = options->keepAliveInterval;
setRetryLoopInterval(options->keepAliveInterval);
m->c->cleansession = options->cleansession;
m->c->maxInflightMessages = (options->reliable) ? 1 : 10;

if (m->c->will)
{
    free(m->c->will->payload);
    free(m->c->will->topic);
    free(m->c->will);
    m->c->will = NULL;
}

if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
{
    const void* source = NULL;

    m->c->will = malloc(sizeof(willMessages));
    if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
    {
        if (options->will->struct_version == 1 && options->will->payload.data)
        {
            m->c->will->payloadlen = options->will->payload.len;
            source = options->will->payload.data;
        }
        else
        {
            m->c->will->payloadlen = strlen(options->will->message);
            source = (void*)options->will->message;
        }
        m->c->will->payload = malloc(m->c->will->payloadlen);
        memcpy(m->c->will->payload, source, m->c->will->payloadlen);
    }
    else
    {
        m->c->will->payload = NULL;
        m->c->will->payloadlen = 0;
    }
    m->c->will->qos = options->will->qos;
    m->c->will->retained = options->will->retained;
    m->c->will->topic = MQTTStrdup(options->will->topicName);
}

#if defined(OPENSSL)
if (m->c->sslopts)
{
if (m->c->sslopts->trustStore)
free((void)m->c->sslopts->trustStore);
if (m->c->sslopts->keyStore)
free((void
)m->c->sslopts->keyStore);
if (m->c->sslopts->privateKey)
free((void)m->c->sslopts->privateKey);
if (m->c->sslopts->privateKeyPassword)
free((void
)m->c->sslopts->privateKeyPassword);
if (m->c->sslopts->enabledCipherSuites)
free((void*)m->c->sslopts->enabledCipherSuites);
free(m->c->sslopts);
m->c->sslopts = NULL;
}

if (options->struct_version != 0 && options->ssl)
{
    m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions));
    memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
    if (options->ssl->trustStore)
        m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
    if (options->ssl->keyStore)
        m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
    if (options->ssl->privateKey)
        m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
    if (options->ssl->privateKeyPassword)
        m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
    if (options->ssl->enabledCipherSuites)
        m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
    m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
}

#endif

m->c->username = options->username;
m->c->password = options->password;
if (options->password)
    m->c->passwordlen = strlen(options->password);
else if (options->struct_version >= 5 && options->binarypwd.data)
{
    m->c->password = options->binarypwd.data;
    m->c->passwordlen = options->binarypwd.len;
}
m->c->retryInterval = options->retryInterval;

if (options->struct_version >= 3)
    MQTTVersion = options->MQTTVersion;
else
    MQTTVersion = MQTTVERSION_DEFAULT;

if (MQTTVersion == MQTTVERSION_DEFAULT)
{
    if ((rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4, start, millisecsTimeout)) != MQTTCLIENT_SUCCESS)
        rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout);
}
else
    rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout);

FUNC_EXIT_RC(rc);
return rc;

}

该函数前面部分在导入需要的参数,关键是MQTTClient_connectURIVersion函数

static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions options, const char serverURI, int MQTTVersion,
START_TIME_TYPE start, long millisecsTimeout)
{
MQTTClients* m = handle;
int rc = SOCKET_ERROR;
int sessionPresent = 0;

FUNC_ENTRY;
if (m->ma && !running)
{
    Thread_start(MQTTClient_run, handle);
    if (MQTTClient_elapsed(start) >= millisecsTimeout)
    {
        rc = SOCKET_ERROR;
        goto exit;
    }
    MQTTClient_sleep(100L);
}

Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, MQTTVersion);

#if defined(OPENSSL)
rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, MQTTVersion);

#else
rc = MQTTProtocol_connect(serverURI, m->c, MQTTVersion);

#endif
if (rc == SOCKET_ERROR)
goto exit;

if (m->c->connect_state == 0)
{
    rc = SOCKET_ERROR;
    goto exit;
}

if (m->c->connect_state == 1) /* TCP connect started - wait for completion */
{
    Thread_unlock_mutex(mqttclient_mutex);
    MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start));
    Thread_lock_mutex(mqttclient_mutex);
    if (rc != 0)
    {
        rc = SOCKET_ERROR;
        goto exit;
    }

#if defined(OPENSSL)
if (m->ssl)
{
int port;
char* hostname;
int setSocketForSSLrc = 0;

    hostname = MQTTProtocol_addressPort(m->serverURI, &port);
    setSocketForSSLrc = SSLSocket_setSocketForSSL(&m->c->net, m->c->sslopts, hostname);
    if (hostname != m->serverURI)
        free(hostname);

    if (setSocketForSSLrc != MQTTCLIENT_SUCCESS)
    {
        if (m->c->session != NULL)
            if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
                Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
        rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket);
        if (rc == TCPSOCKET_INTERRUPTED)
            m->c->connect_state = 2;  /* the connect is still in progress */
        else if (rc == SSL_FATAL)
        {
            rc = SOCKET_ERROR;
            goto exit;
        }
        else if (rc == 1)
        {
            rc = MQTTCLIENT_SUCCESS;
            m->c->connect_state = 3;
            if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
            {
                rc = SOCKET_ERROR;
                goto exit;
            }
            if (!m->c->cleansession && m->c->session == NULL)
                m->c->session = SSL_get1_session(m->c->net.ssl);
        }
    }
    else
    {
        rc = SOCKET_ERROR;
        goto exit;
    }
}
else
{

#endif
m->c->connect_state = 3; / TCP connect completed, in which case send the MQTT connect packet /
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
}

#if defined(OPENSSL)
}

#endif
}

#if defined(OPENSSL)
if (m->c->connect_state == 2) / SSL connect sent - wait for completion /
{
Thread_unlock_mutex(mqttclient_mutex);
MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start));
Thread_lock_mutex(mqttclient_mutex);
if (rc != 1)
{
rc = SOCKET_ERROR;
goto exit;
}
if(!m->c->cleansession && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
m->c->connect_state = 3; / TCP connect completed, in which case send the MQTT connect packet /
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
}
}

#endif

if (m->c->connect_state == 3) /* MQTT connect sent - wait for CONNACK */
{
    MQTTPacket* pack = NULL;

    Thread_unlock_mutex(mqttclient_mutex);
    pack = MQTTClient_waitfor(handle, CONNACK, &rc, millisecsTimeout - MQTTClient_elapsed(start));
    Thread_lock_mutex(mqttclient_mutex);
    if (pack == NULL)
        rc = SOCKET_ERROR;
    else
    {
        Connack* connack = (Connack*)pack;
        Log(TRACE_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
        if ((rc = connack->rc) == MQTTCLIENT_SUCCESS)
        {
            m->c->connected = 1;
            m->c->good = 1;
            m->c->connect_state = 0;
            if (MQTTVersion == 4)
                sessionPresent = connack->flags.bits.sessionPresent;
            if (m->c->cleansession)
                rc = MQTTClient_cleanSession(m->c);
            if (m->c->outboundMsgs->count > 0)
            {
                ListElement* outcurrent = NULL;

                while (ListNextElement(m->c->outboundMsgs, &outcurrent))
                {
                    Messages* m = (Messages*)(outcurrent->content);
                    m->lastTouch = 0;
                }
                MQTTProtocol_retry((time_t)0, 1, 1);
                if (m->c->connected != 1)
                    rc = MQTTCLIENT_DISCONNECTED;
            }
        }
        free(connack);
        m->pack = NULL;
    }
}

exit:
if (rc == MQTTCLIENT_SUCCESS)
{
if (options->struct_version == 4) / means we have to fill out return values /
{
options->returned.serverURI = serverURI;
options->returned.MQTTVersion = MQTTVersion;
options->returned.sessionPresent = sessionPresent;
}
}
else
MQTTClient_disconnect1(handle, 0, 0, (MQTTVersion == 3)); / don’t want to call connection lost /
FUNC_EXIT_RC(rc);
return rc;
}
这个函数里主要是调用了SSLSocket_setSocketForSSL和SSLSocket_connect