A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Revision:
33:38e2e7bf91eb
Parent:
30:b2aed80037db
--- a/MQTTThreadedClient.cpp	Sat Apr 01 12:55:49 2017 +0000
+++ b/MQTTThreadedClient.cpp	Sun Apr 02 09:11:53 2017 +0800
@@ -196,7 +196,7 @@
         int ret;
         
         /* Start the handshake, the rest will be done in onReceive() */
-        DBG("Starting the TLS handshake...\r\n");
+        printf("Starting the TLS handshake...\r\n");
         ret = mbedtls_ssl_handshake(&_ssl);
         if (ret < 0) 
         {
@@ -212,7 +212,7 @@
         }
 
         /* Handshake done, time to print info */
-        DBG("TLS connection to %s:%d established\r\n", 
+        printf("TLS connection to %s:%d established\r\n", 
             host.c_str(), port);
 
         const uint32_t buf_size = 1024;
@@ -220,23 +220,24 @@
         mbedtls_x509_crt_info(buf, buf_size, "\r    ",
                         mbedtls_ssl_get_peer_cert(&_ssl));
                         
-        DBG("Server certificate:\r\n%s\r", buf);
+        printf("Server certificate:\r\n%s\r", buf);
         // Verify server cert ...
         uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
         if( flags != 0 )
         {
             mbedtls_x509_crt_verify_info(buf, buf_size, "\r  ! ", flags);
-            DBG("Certificate verification failed:\r\n%s\r\r\n", buf);
+            printf("Certificate verification failed:\r\n%s\r\r\n", buf);
             // free server cert ... before error return
             delete [] buf;
             return -1;
         }
         
-        DBG("Certificate verification passed\r\n\r\n");
+        printf("Certificate verification passed\r\n\r\n");
         // delete server cert after verification
         delete [] buf;
         
 #if defined(MBEDTLS_SSL_CLI_C)        
+				printf("Saving SSL/TLS session ...\r\n");
         // TODO: Save the session here for reconnect.
         if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
         {
@@ -244,8 +245,9 @@
             hasSavedSession = false;
             return -1;
         }  
+        printf("Session saved for reconnect ...\r\n");				
 #endif        
-        DBG("Session saved for reconnect ...\r\n");     
+     
         hasSavedSession = true;
         
         return 0;
@@ -584,7 +586,7 @@
     *message = msg;
     
     // Push the data to the thread
-    DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
+    DBG("Pushing data to consumer thread ...\r\n");
     mqueue.put(message);
     
     return SUCCESS;
@@ -596,7 +598,7 @@
      
      if (!isConnected) 
      {
-        DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
+        DBG("Not connected!!! ...\r\n");
         return FAILURE;
      }
         
@@ -605,17 +607,17 @@
               topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
      if (len <= 0)
      {
-         DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
+         DBG("Failed serializing message ...\r\n");
          return FAILURE;
      }
      
      if (sendPacket(len) == SUCCESS)
      {
-         DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
+         DBG("Successfully sent publish packet to server ...\r\n");
          return SUCCESS;
      }
     
-    DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
+    DBG("Failed to send publish packet to server ...\r\n");
     return FAILURE;
 }
 
@@ -719,7 +721,7 @@
     MQTTString topicName = MQTTString_initializer;
     Message msg;
     int intQoS;
-    DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
+    DBG("Deserializing publish message ...\r\n");
     if (MQTTDeserialize_publish((unsigned char*)&msg.dup, 
             &intQoS, 
             (unsigned char*)&msg.retained, 
@@ -728,7 +730,7 @@
             (unsigned char**)&msg.payload, 
             (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
     {
-        DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
+        DBG("Error deserializing published message ...\r\n");
         return -1;
     }
 
@@ -739,7 +741,7 @@
     }else
         topic = (const char *) topicName.cstring;
     
-    DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
+    DBG("Got message for topic [%s], QoS [%d] ...\r\n", topic.c_str(), intQoS);
     
     msg.qos = (QoS) intQoS;
 
@@ -750,7 +752,7 @@
         // Call the callback function 
         if (topicCBMap[topic].attached())
         {
-            DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
+            DBG("Invoking function handler for topic ...\r\n");
             MessageData md(topicName, msg);            
             topicCBMap[topic](md);
             
@@ -805,7 +807,7 @@
     int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
     if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
     {
-        DBG("[Thread:%d]Ping request sent successfully ...\r\n", Thread::gettid());
+        DBG("Ping request sent successfully ...\r\n");
     }
 }
 
@@ -852,7 +854,7 @@
                 case BUFFER_OVERFLOW: 
                     {
                         // TODO: Network error, do we disconnect and reconnect?
-                        DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
+                        DBG("Failure or buffer overflow problem ... \r\n");
                         MBED_ASSERT(false);
                     }
                     break;
@@ -866,22 +868,22 @@
                     break;
                 case PUBLISH: 
                     {
-                        DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid());
+                        DBG("Publish received!....\r\n");
                         // We receive data from the MQTT server ..
                         if (handlePublishMsg() < 0) {
-                            DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
+                            DBG("Error handling PUBLISH message ... \r\n");
                             break;
                         }
                     }
                     break;
                 case PINGRESP: 
                     {
-                        DBG("[Thread:%d]Got ping response ...\r\n", Thread::gettid());
+                        DBG("Got ping response ...\r\n");
                         resetConnectionTimer();
                     }
                     break;
                 default:
-                    DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
+                    DBG("Unknown/Not handled message from server pType[%d]\r\n", pType);
             }
 
             // Check if its time to send a keepAlive packet
@@ -895,7 +897,7 @@
             osEvent evt = mqueue.get(10);
             if (evt.status == osEventMessage) {
 
-                DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
+                DBG("Got message to publish! ... \r\n");
 
                 // Unpack the message
                 PubMessage * message = (PubMessage *)evt.value.p;