A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Diff: MQTTThreadedClient.cpp
- Revision:
- 33:38e2e7bf91eb
- Parent:
- 30:b2aed80037db
--- a/MQTTThreadedClient.cpp Sat Apr 01 12:55:49 2017 +0000 +++ b/MQTTThreadedClient.cpp Sun Apr 02 09:11:53 2017 +0800 @@ -196,7 +196,7 @@ int ret; /* Start the handshake, the rest will be done in onReceive() */ - DBG("Starting the TLS handshake...\r\n"); + printf("Starting the TLS handshake...\r\n"); ret = mbedtls_ssl_handshake(&_ssl); if (ret < 0) { @@ -212,7 +212,7 @@ } /* Handshake done, time to print info */ - DBG("TLS connection to %s:%d established\r\n", + printf("TLS connection to %s:%d established\r\n", host.c_str(), port); const uint32_t buf_size = 1024; @@ -220,23 +220,24 @@ mbedtls_x509_crt_info(buf, buf_size, "\r ", mbedtls_ssl_get_peer_cert(&_ssl)); - DBG("Server certificate:\r\n%s\r", buf); + printf("Server certificate:\r\n%s\r", buf); // Verify server cert ... uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl); if( flags != 0 ) { mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags); - DBG("Certificate verification failed:\r\n%s\r\r\n", buf); + printf("Certificate verification failed:\r\n%s\r\r\n", buf); // free server cert ... before error return delete [] buf; return -1; } - DBG("Certificate verification passed\r\n\r\n"); + printf("Certificate verification passed\r\n\r\n"); // delete server cert after verification delete [] buf; #if defined(MBEDTLS_SSL_CLI_C) + printf("Saving SSL/TLS session ...\r\n"); // TODO: Save the session here for reconnect. if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 ) { @@ -244,8 +245,9 @@ hasSavedSession = false; return -1; } + printf("Session saved for reconnect ...\r\n"); #endif - DBG("Session saved for reconnect ...\r\n"); + hasSavedSession = true; return 0; @@ -584,7 +586,7 @@ *message = msg; // Push the data to the thread - DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid()); + DBG("Pushing data to consumer thread ...\r\n"); mqueue.put(message); return SUCCESS; @@ -596,7 +598,7 @@ if (!isConnected) { - DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid()); + DBG("Not connected!!! ...\r\n"); return FAILURE; } @@ -605,17 +607,17 @@ topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen); if (len <= 0) { - DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid()); + DBG("Failed serializing message ...\r\n"); return FAILURE; } if (sendPacket(len) == SUCCESS) { - DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid()); + DBG("Successfully sent publish packet to server ...\r\n"); return SUCCESS; } - DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid()); + DBG("Failed to send publish packet to server ...\r\n"); return FAILURE; } @@ -719,7 +721,7 @@ MQTTString topicName = MQTTString_initializer; Message msg; int intQoS; - DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid()); + DBG("Deserializing publish message ...\r\n"); if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, @@ -728,7 +730,7 @@ (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) { - DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid()); + DBG("Error deserializing published message ...\r\n"); return -1; } @@ -739,7 +741,7 @@ }else topic = (const char *) topicName.cstring; - DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS); + DBG("Got message for topic [%s], QoS [%d] ...\r\n", topic.c_str(), intQoS); msg.qos = (QoS) intQoS; @@ -750,7 +752,7 @@ // Call the callback function if (topicCBMap[topic].attached()) { - DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid()); + DBG("Invoking function handler for topic ...\r\n"); MessageData md(topicName, msg); topicCBMap[topic](md); @@ -805,7 +807,7 @@ int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet { - DBG("[Thread:%d]Ping request sent successfully ...\r\n", Thread::gettid()); + DBG("Ping request sent successfully ...\r\n"); } } @@ -852,7 +854,7 @@ case BUFFER_OVERFLOW: { // TODO: Network error, do we disconnect and reconnect? - DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid()); + DBG("Failure or buffer overflow problem ... \r\n"); MBED_ASSERT(false); } break; @@ -866,22 +868,22 @@ break; case PUBLISH: { - DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid()); + DBG("Publish received!....\r\n"); // We receive data from the MQTT server .. if (handlePublishMsg() < 0) { - DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid()); + DBG("Error handling PUBLISH message ... \r\n"); break; } } break; case PINGRESP: { - DBG("[Thread:%d]Got ping response ...\r\n", Thread::gettid()); + DBG("Got ping response ...\r\n"); resetConnectionTimer(); } break; default: - DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType); + DBG("Unknown/Not handled message from server pType[%d]\r\n", pType); } // Check if its time to send a keepAlive packet @@ -895,7 +897,7 @@ osEvent evt = mqueue.get(10); if (evt.status == osEventMessage) { - DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid()); + DBG("Got message to publish! ... \r\n"); // Unpack the message PubMessage * message = (PubMessage *)evt.value.p;