A small footprint MQTT library

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

Revision:
8:83bb166aba73
Parent:
5:34779607059c
Child:
12:30b08cda82fd
--- a/mqtt_codec.c	Fri Aug 26 12:59:16 2016 -0700
+++ b/mqtt_codec.c	Thu Oct 20 17:07:55 2016 -0700
@@ -5,9 +5,11 @@
 #include <limits.h>
 #include "azure_c_shared_utility/gballoc.h"
 #include "azure_c_shared_utility/buffer_.h"
+#include "azure_c_shared_utility/strings.h"
 #include "azure_c_shared_utility/macro_utils.h"
 #include "azure_c_shared_utility/xlogging.h"
 #include "azure_umqtt_c/mqtt_codec.h"
+#include <inttypes.h>
 
 #define PAYLOAD_OFFSET                      5
 #define PACKET_TYPE_BYTE(p)                 ((uint8_t)(((uint8_t)(p)) & 0xf0))
@@ -41,6 +43,9 @@
     CODEC_STATE_VAR_HEADER,     \
     CODEC_STATE_PAYLOAD
 
+static const char* TRUE_CONST = "true";
+static const char* FALSE_CONST = "false";
+
 DEFINE_ENUM(CODEC_STATE_RESULT, CODEC_STATE_VALUES);
 
 typedef struct MQTTCODEC_INSTANCE_TAG
@@ -64,6 +69,20 @@
     QOS_VALUE qualityOfServiceValue;
 } PUBLISH_HEADER_INFO;
 
+static const char* retrieve_qos_value(QOS_VALUE value)
+{
+    switch (value)
+    {
+        case DELIVER_AT_MOST_ONCE:
+            return "DELIVER_AT_MOST_ONCE";
+        case DELIVER_AT_LEAST_ONCE:
+            return "DELIVER_AT_LEAST_ONCE";
+        case DELIVER_EXACTLY_ONCE:
+        default:
+            return "DELIVER_EXACTLY_ONCE";
+    }
+}
+
 void byteutil_writeByte(uint8_t** buffer, uint8_t value)
 {
     if (buffer != NULL)
@@ -105,7 +124,7 @@
     return result;
 }
 
-static int addListItemsToUnsubscribePacket(BUFFER_HANDLE ctrlPacket, const char** payloadList, size_t payloadCount)
+static int addListItemsToUnsubscribePacket(BUFFER_HANDLE ctrlPacket, const char** payloadList, size_t payloadCount, STRING_HANDLE trace_log)
 {
     int result = 0;
     if (payloadList == NULL || ctrlPacket == NULL)
@@ -133,12 +152,16 @@
                 iterator += offsetLen;
                 byteutil_writeUTF(&iterator, payloadList[index], (uint16_t)topicLen);
             }
+            if (trace_log != NULL)
+            {
+                STRING_sprintf(trace_log, " | TOPIC_NAME: %s", payloadList[index]);
+            }
         }
     }
     return result;
 }
 
-static int addListItemsToSubscribePacket(BUFFER_HANDLE ctrlPacket, SUBSCRIBE_PAYLOAD* payloadList, size_t payloadCount)
+static int addListItemsToSubscribePacket(BUFFER_HANDLE ctrlPacket, SUBSCRIBE_PAYLOAD* payloadList, size_t payloadCount, STRING_HANDLE trace_log)
 {
     int result = 0;
     if (payloadList == NULL || ctrlPacket == NULL)
@@ -166,13 +189,18 @@
                 iterator += offsetLen;
                 byteutil_writeUTF(&iterator, payloadList[index].subscribeTopic, (uint16_t)topicLen);
                 *iterator = payloadList[index].qosReturn;
+
+                if (trace_log != NULL)
+                {
+                    STRING_sprintf(trace_log, " | TOPIC_NAME: %s | QOS: %d", payloadList[index].subscribeTopic, (int)payloadList[index].qosReturn);
+                }
             }
         }
     }
     return result;
 }
 
-static int constructConnectVariableHeader(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions)
+static int constructConnectVariableHeader(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
 {
     int result = 0;
     if (BUFFER_enlarge(ctrlPacket, CONNECT_VARIABLE_HEADER_SIZE) != 0)
@@ -188,6 +216,10 @@
         }
         else
         {
+            if (trace_log != NULL)
+            {
+                STRING_sprintf(trace_log, " | VER: %d | KEEPALIVE: %d | FLAGS:", PROTOCOL_NUMBER, mqttOptions->keepAliveInterval);
+            }
             byteutil_writeUTF(&iterator, "MQTT", 4);
             byteutil_writeByte(&iterator, PROTOCOL_NUMBER);
             byteutil_writeByte(&iterator, 0); // Flags will be entered later
@@ -198,7 +230,7 @@
     return result;
 }
 
-static int constructPublishVariableHeader(BUFFER_HANDLE ctrlPacket, const PUBLISH_HEADER_INFO* publishHeader)
+static int constructPublishVariableHeader(BUFFER_HANDLE ctrlPacket, const PUBLISH_HEADER_INFO* publishHeader, STRING_HANDLE trace_log)
 {
     int result = 0;
     size_t topicLen = 0;
@@ -236,8 +268,16 @@
             iterator += currLen;
             /* The Topic Name MUST be present as the first field in the PUBLISH Packet Variable header.It MUST be 792 a UTF-8 encoded string [MQTT-3.3.2-1] as defined in section 1.5.3.*/
             byteutil_writeUTF(&iterator, publishHeader->topicName, (uint16_t)topicLen);
+            if (trace_log != NULL)
+            {
+                STRING_sprintf(trace_log, " | TOPIC_NAME: %s", publishHeader->topicName);
+            }
             if (idLen > 0)
             {
+                if (trace_log != NULL)
+                {
+                    STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, publishHeader->packetId);
+                }
                 byteutil_writeInt(&iterator, publishHeader->packetId);
             }
             result = 0;
@@ -310,7 +350,7 @@
     else
     {
         size_t packetLen = BUFFER_length(ctrlPacket);
-        uint8_t remainSize[4] = { 0 };
+        uint8_t remainSize[4] ={ 0 };
         size_t index = 0;
 
         // Calculate the length of packet
@@ -350,7 +390,7 @@
     return result;
 }
 
-static int constructConnPayload(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions)
+static int constructConnPayload(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
 {
     int result = 0;
     if (mqttOptions == NULL || ctrlPacket == NULL)
@@ -404,7 +444,7 @@
         {
             result = __LINE__;
         }
-        else if ( (willMessageLen > 0 && willTopicLen == 0) || (willTopicLen > 0 && willMessageLen == 0) )
+        else if ((willMessageLen > 0 && willTopicLen == 0) || (willTopicLen > 0 && willMessageLen == 0))
         {
             result = __LINE__;
         }
@@ -427,8 +467,17 @@
             }
             else
             {
+                STRING_HANDLE connect_payload_trace = NULL;
+                if (trace_log != NULL)
+                {
+                    connect_payload_trace = STRING_new();
+                }
                 if (willMessageLen > 0 && willTopicLen > 0)
                 {
+                    if (trace_log != NULL)
+                    {
+                        (void)STRING_sprintf(connect_payload_trace, " | WILL_TOPIC: %s", mqttOptions->willTopic);
+                    }
                     packet[CONN_FLAG_BYTE_OFFSET] |= WILL_FLAG_FLAG;
                     byteutil_writeUTF(&iterator, mqttOptions->willTopic, (uint16_t)willTopicLen);
                     packet[CONN_FLAG_BYTE_OFFSET] |= mqttOptions->qualityOfServiceValue;
@@ -442,17 +491,35 @@
                 {
                     packet[CONN_FLAG_BYTE_OFFSET] |= USERNAME_FLAG;
                     byteutil_writeUTF(&iterator, mqttOptions->username, (uint16_t)usernameLen);
+                    if (trace_log != NULL)
+                    {
+                        (void)STRING_sprintf(connect_payload_trace, " | USERNAME: %s", mqttOptions->username);
+                    }
                 }
                 if (passwordLen > 0)
                 {
                     packet[CONN_FLAG_BYTE_OFFSET] |= PASSWORD_FLAG;
                     byteutil_writeUTF(&iterator, mqttOptions->password, (uint16_t)passwordLen);
+                    if (trace_log != NULL)
+                    {
+                        (void)STRING_sprintf(connect_payload_trace, " | PWD: XXXX");
+                    }
                 }
                 // TODO: Get the rest of the flags
+                if (trace_log != NULL)
+                {
+                    (void)STRING_sprintf(connect_payload_trace, " | CLEAN: %s", mqttOptions->useCleanSession ? "1" : "0");
+                }
                 if (mqttOptions->useCleanSession)
                 {
                     packet[CONN_FLAG_BYTE_OFFSET] |= CLEAN_SESSION_FLAG;
                 }
+                if (trace_log != NULL)
+                {
+                    (void)STRING_sprintf(trace_log, " %zu", packet[CONN_FLAG_BYTE_OFFSET]);
+                    (void)STRING_concat_with_STRING(trace_log, connect_payload_trace);
+                    STRING_delete(connect_payload_trace);
+                }
                 result = 0;
             }
         }
@@ -488,7 +555,7 @@
                     result = __LINE__;
                     break;
                 }
-            } while ( (encodeByte & NEXT_128_CHUNK) != 0);
+            } while ((encodeByte & NEXT_128_CHUNK) != 0);
 
             if (totalLen > 0)
             {
@@ -527,7 +594,7 @@
 MQTTCODEC_HANDLE mqtt_codec_create(ON_PACKET_COMPLETE_CALLBACK packetComplete, void* callbackCtx)
 {
     MQTTCODEC_HANDLE result;
-    result = malloc(sizeof(MQTTCODEC_INSTANCE) );
+    result = malloc(sizeof(MQTTCODEC_INSTANCE));
     /* Codes_SRS_MQTT_CODEC_07_001: [If a failure is encountered then mqtt_codec_create shall return NULL.] */
     if (result != NULL)
     {
@@ -557,7 +624,7 @@
     }
 }
 
-BUFFER_HANDLE mqtt_codec_connect(const MQTT_CLIENT_OPTIONS* mqttOptions)
+BUFFER_HANDLE mqtt_codec_connect(const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
 {
     BUFFER_HANDLE result;
     /* Codes_SRS_MQTT_CODEC_07_008: [If the parameters mqttOptions is NULL then mqtt_codec_connect shall return a null value.] */
@@ -571,8 +638,13 @@
         result = BUFFER_new();
         if (result != NULL)
         {
+            STRING_HANDLE varible_header_log = NULL;
+            if (trace_log != NULL)
+            {
+                varible_header_log = STRING_new();
+            }
             // Add Variable Header Information
-            if (constructConnectVariableHeader(result, mqttOptions) != 0)
+            if (constructConnectVariableHeader(result, mqttOptions, varible_header_log) != 0)
             {
                 /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
                 BUFFER_delete(result);
@@ -580,7 +652,7 @@
             }
             else
             {
-                if (constructConnPayload(result, mqttOptions) != 0)
+                if (constructConnPayload(result, mqttOptions, varible_header_log) != 0)
                 {
                     /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
                     BUFFER_delete(result);
@@ -588,12 +660,27 @@
                 }
                 else
                 {
+                    if (trace_log != NULL)
+                    {
+                        (void)STRING_copy(trace_log, "CONNECT");
+                    }
                     if (constructFixedHeader(result, CONNECT_TYPE, 0) != 0)
                     {
                         /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
                         BUFFER_delete(result);
                         result = NULL;
                     }
+                    else
+                    {
+                        if (trace_log != NULL)
+                        {
+                            (void)STRING_concat_with_STRING(trace_log, varible_header_log);
+                        }
+                    }
+                }
+                if (varible_header_log != NULL)
+                {
+                    STRING_delete(varible_header_log);
                 }
             }
         }
@@ -632,7 +719,7 @@
     return result;
 }
 
-BUFFER_HANDLE mqtt_codec_publish(QOS_VALUE qosValue, bool duplicateMsg, bool serverRetain, uint16_t packetId, const char* topicName, const uint8_t* msgBuffer, size_t buffLen)
+BUFFER_HANDLE mqtt_codec_publish(QOS_VALUE qosValue, bool duplicateMsg, bool serverRetain, uint16_t packetId, const char* topicName, const uint8_t* msgBuffer, size_t buffLen, STRING_HANDLE trace_log)
 {
     BUFFER_HANDLE result;
     /* Codes_SRS_MQTT_CODEC_07_005: [If the parameters topicName is NULL then mqtt_codec_publish shall return NULL.] */
@@ -648,7 +735,7 @@
     }
     else
     {
-        PUBLISH_HEADER_INFO publishInfo = { 0 };
+        PUBLISH_HEADER_INFO publishInfo ={ 0 };
         publishInfo.topicName = topicName;
         publishInfo.packetId = packetId;
         publishInfo.qualityOfServiceValue = qosValue;
@@ -672,7 +759,15 @@
         result = BUFFER_new();
         if (result != NULL)
         {
-            if (constructPublishVariableHeader(result, &publishInfo) != 0)
+            STRING_HANDLE varible_header_log = NULL;
+            if (trace_log != NULL)
+            {
+                varible_header_log = STRING_construct_sprintf(" | IS_DUP: %s | RETAIN: %d | QOS: %s", duplicateMsg ? TRUE_CONST : FALSE_CONST,
+                    serverRetain ? 1 : 0,
+                    retrieve_qos_value(publishInfo.qualityOfServiceValue) );
+            }
+
+            if (constructPublishVariableHeader(result, &publishInfo, varible_header_log) != 0)
             {
                 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
                 BUFFER_delete(result);
@@ -703,20 +798,39 @@
                             iterator += payloadOffset;
                             // Write Message
                             memcpy(iterator, msgBuffer, buffLen);
+                            if (trace_log)
+                            {
+                                STRING_sprintf(varible_header_log, " | PAYLOAD_LEN: %zu", buffLen);
+                            }
                         }
                     }
                 }
 
                 if (result != NULL)
                 {
+                    if (trace_log != NULL)
+                    {
+                        (void)STRING_copy(trace_log, "PUBLISH");
+                    }
                     if (constructFixedHeader(result, PUBLISH_TYPE, headerFlags) != 0)
                     {
                         /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
                         BUFFER_delete(result);
                         result = NULL;
                     }
+                    else
+                    {
+                        if (trace_log != NULL)
+                        {
+                            (void)STRING_concat_with_STRING(trace_log, varible_header_log);
+                        }
+                    }
                 }
             }
+            if (varible_header_log != NULL)
+            {
+                STRING_delete(varible_header_log);
+            }
         }
     }
     return result;
@@ -785,7 +899,7 @@
     return result;
 }
 
-BUFFER_HANDLE mqtt_codec_subscribe(uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count)
+BUFFER_HANDLE mqtt_codec_subscribe(uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count, STRING_HANDLE trace_log)
 {
     BUFFER_HANDLE result;
     /* Codes_SRS_MQTT_CODEC_07_023: [If the parameters subscribeList is NULL or if count is 0 then mqtt_codec_subscribe shall return NULL.] */
@@ -807,8 +921,13 @@
             }
             else
             {
+                STRING_HANDLE sub_trace = NULL;
+                if (trace_log != NULL)
+                {
+                    sub_trace = STRING_construct_sprintf(" | PACKET_ID: %"PRIu16, packetId);
+                }
                 /* Codes_SRS_MQTT_CODEC_07_024: [mqtt_codec_subscribe shall iterate through count items in the subscribeList.] */
-                if (addListItemsToSubscribePacket(result, subscribeList, count) != 0)
+                if (addListItemsToSubscribePacket(result, subscribeList, count, sub_trace) != 0)
                 {
                     /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
                     BUFFER_delete(result);
@@ -816,12 +935,28 @@
                 }
                 else
                 {
+
+                    if (trace_log != NULL)
+                    {
+                        STRING_concat(trace_log, "SUBSCRIBE");
+                    }
                     if (constructFixedHeader(result, SUBSCRIBE_TYPE, SUBSCRIBE_FIXED_HEADER_FLAG) != 0)
                     {
                         /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
                         BUFFER_delete(result);
                         result = NULL;
                     }
+                    else
+                    {
+                        if (trace_log != NULL)
+                        {
+                            (void)STRING_concat_with_STRING(trace_log, sub_trace);
+                        }
+                    }
+                }
+                if (sub_trace != NULL)
+                {
+                    STRING_delete(sub_trace);
                 }
             }
         }
@@ -829,7 +964,7 @@
     return result;
 }
 
-BUFFER_HANDLE mqtt_codec_unsubscribe(uint16_t packetId, const char** unsubscribeList, size_t count)
+BUFFER_HANDLE mqtt_codec_unsubscribe(uint16_t packetId, const char** unsubscribeList, size_t count, STRING_HANDLE trace_log)
 {
     BUFFER_HANDLE result;
     /* Codes_SRS_MQTT_CODEC_07_027: [If the parameters unsubscribeList is NULL or if count is 0 then mqtt_codec_unsubscribe shall return NULL.] */
@@ -851,8 +986,13 @@
             }
             else
             {
+                STRING_HANDLE unsub_trace = NULL;
+                if (trace_log != NULL)
+                {
+                    unsub_trace = STRING_construct_sprintf(" | PACKET_ID: %"PRIu16, packetId);
+                }
                 /* Codes_SRS_MQTT_CODEC_07_028: [mqtt_codec_unsubscribe shall iterate through count items in the unsubscribeList.] */
-                if (addListItemsToUnsubscribePacket(result, unsubscribeList, count) != 0)
+                if (addListItemsToUnsubscribePacket(result, unsubscribeList, count, unsub_trace) != 0)
                 {
                     /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
                     BUFFER_delete(result);
@@ -860,12 +1000,27 @@
                 }
                 else
                 {
+                    if (trace_log != NULL)
+                    {
+                        (void)STRING_copy(trace_log, "UNSUBSCRIBE");
+                    }
                     if (constructFixedHeader(result, UNSUBSCRIBE_TYPE, UNSUBSCRIBE_FIXED_HEADER_FLAG) != 0)
                     {
                         /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
                         BUFFER_delete(result);
                         result = NULL;
                     }
+                    else
+                    {
+                        if (trace_log != NULL)
+                        {
+                            (void)STRING_concat_with_STRING(trace_log, unsub_trace);
+                        }
+                    }
+                }
+                if (unsub_trace != NULL)
+                {
+                    STRING_delete(unsub_trace);
                 }
             }
         }