A small footprint MQTT library
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: mqtt_codec.c
- Revision:
- 8:83bb166aba73
- Parent:
- 5:34779607059c
- Child:
- 12:30b08cda82fd
--- a/mqtt_codec.c Fri Aug 26 12:59:16 2016 -0700 +++ b/mqtt_codec.c Thu Oct 20 17:07:55 2016 -0700 @@ -5,9 +5,11 @@ #include <limits.h> #include "azure_c_shared_utility/gballoc.h" #include "azure_c_shared_utility/buffer_.h" +#include "azure_c_shared_utility/strings.h" #include "azure_c_shared_utility/macro_utils.h" #include "azure_c_shared_utility/xlogging.h" #include "azure_umqtt_c/mqtt_codec.h" +#include <inttypes.h> #define PAYLOAD_OFFSET 5 #define PACKET_TYPE_BYTE(p) ((uint8_t)(((uint8_t)(p)) & 0xf0)) @@ -41,6 +43,9 @@ CODEC_STATE_VAR_HEADER, \ CODEC_STATE_PAYLOAD +static const char* TRUE_CONST = "true"; +static const char* FALSE_CONST = "false"; + DEFINE_ENUM(CODEC_STATE_RESULT, CODEC_STATE_VALUES); typedef struct MQTTCODEC_INSTANCE_TAG @@ -64,6 +69,20 @@ QOS_VALUE qualityOfServiceValue; } PUBLISH_HEADER_INFO; +static const char* retrieve_qos_value(QOS_VALUE value) +{ + switch (value) + { + case DELIVER_AT_MOST_ONCE: + return "DELIVER_AT_MOST_ONCE"; + case DELIVER_AT_LEAST_ONCE: + return "DELIVER_AT_LEAST_ONCE"; + case DELIVER_EXACTLY_ONCE: + default: + return "DELIVER_EXACTLY_ONCE"; + } +} + void byteutil_writeByte(uint8_t** buffer, uint8_t value) { if (buffer != NULL) @@ -105,7 +124,7 @@ return result; } -static int addListItemsToUnsubscribePacket(BUFFER_HANDLE ctrlPacket, const char** payloadList, size_t payloadCount) +static int addListItemsToUnsubscribePacket(BUFFER_HANDLE ctrlPacket, const char** payloadList, size_t payloadCount, STRING_HANDLE trace_log) { int result = 0; if (payloadList == NULL || ctrlPacket == NULL) @@ -133,12 +152,16 @@ iterator += offsetLen; byteutil_writeUTF(&iterator, payloadList[index], (uint16_t)topicLen); } + if (trace_log != NULL) + { + STRING_sprintf(trace_log, " | TOPIC_NAME: %s", payloadList[index]); + } } } return result; } -static int addListItemsToSubscribePacket(BUFFER_HANDLE ctrlPacket, SUBSCRIBE_PAYLOAD* payloadList, size_t payloadCount) +static int addListItemsToSubscribePacket(BUFFER_HANDLE ctrlPacket, SUBSCRIBE_PAYLOAD* payloadList, size_t payloadCount, STRING_HANDLE trace_log) { int result = 0; if (payloadList == NULL || ctrlPacket == NULL) @@ -166,13 +189,18 @@ iterator += offsetLen; byteutil_writeUTF(&iterator, payloadList[index].subscribeTopic, (uint16_t)topicLen); *iterator = payloadList[index].qosReturn; + + if (trace_log != NULL) + { + STRING_sprintf(trace_log, " | TOPIC_NAME: %s | QOS: %d", payloadList[index].subscribeTopic, (int)payloadList[index].qosReturn); + } } } } return result; } -static int constructConnectVariableHeader(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions) +static int constructConnectVariableHeader(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log) { int result = 0; if (BUFFER_enlarge(ctrlPacket, CONNECT_VARIABLE_HEADER_SIZE) != 0) @@ -188,6 +216,10 @@ } else { + if (trace_log != NULL) + { + STRING_sprintf(trace_log, " | VER: %d | KEEPALIVE: %d | FLAGS:", PROTOCOL_NUMBER, mqttOptions->keepAliveInterval); + } byteutil_writeUTF(&iterator, "MQTT", 4); byteutil_writeByte(&iterator, PROTOCOL_NUMBER); byteutil_writeByte(&iterator, 0); // Flags will be entered later @@ -198,7 +230,7 @@ return result; } -static int constructPublishVariableHeader(BUFFER_HANDLE ctrlPacket, const PUBLISH_HEADER_INFO* publishHeader) +static int constructPublishVariableHeader(BUFFER_HANDLE ctrlPacket, const PUBLISH_HEADER_INFO* publishHeader, STRING_HANDLE trace_log) { int result = 0; size_t topicLen = 0; @@ -236,8 +268,16 @@ iterator += currLen; /* The Topic Name MUST be present as the first field in the PUBLISH Packet Variable header.It MUST be 792 a UTF-8 encoded string [MQTT-3.3.2-1] as defined in section 1.5.3.*/ byteutil_writeUTF(&iterator, publishHeader->topicName, (uint16_t)topicLen); + if (trace_log != NULL) + { + STRING_sprintf(trace_log, " | TOPIC_NAME: %s", publishHeader->topicName); + } if (idLen > 0) { + if (trace_log != NULL) + { + STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, publishHeader->packetId); + } byteutil_writeInt(&iterator, publishHeader->packetId); } result = 0; @@ -310,7 +350,7 @@ else { size_t packetLen = BUFFER_length(ctrlPacket); - uint8_t remainSize[4] = { 0 }; + uint8_t remainSize[4] ={ 0 }; size_t index = 0; // Calculate the length of packet @@ -350,7 +390,7 @@ return result; } -static int constructConnPayload(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions) +static int constructConnPayload(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log) { int result = 0; if (mqttOptions == NULL || ctrlPacket == NULL) @@ -404,7 +444,7 @@ { result = __LINE__; } - else if ( (willMessageLen > 0 && willTopicLen == 0) || (willTopicLen > 0 && willMessageLen == 0) ) + else if ((willMessageLen > 0 && willTopicLen == 0) || (willTopicLen > 0 && willMessageLen == 0)) { result = __LINE__; } @@ -427,8 +467,17 @@ } else { + STRING_HANDLE connect_payload_trace = NULL; + if (trace_log != NULL) + { + connect_payload_trace = STRING_new(); + } if (willMessageLen > 0 && willTopicLen > 0) { + if (trace_log != NULL) + { + (void)STRING_sprintf(connect_payload_trace, " | WILL_TOPIC: %s", mqttOptions->willTopic); + } packet[CONN_FLAG_BYTE_OFFSET] |= WILL_FLAG_FLAG; byteutil_writeUTF(&iterator, mqttOptions->willTopic, (uint16_t)willTopicLen); packet[CONN_FLAG_BYTE_OFFSET] |= mqttOptions->qualityOfServiceValue; @@ -442,17 +491,35 @@ { packet[CONN_FLAG_BYTE_OFFSET] |= USERNAME_FLAG; byteutil_writeUTF(&iterator, mqttOptions->username, (uint16_t)usernameLen); + if (trace_log != NULL) + { + (void)STRING_sprintf(connect_payload_trace, " | USERNAME: %s", mqttOptions->username); + } } if (passwordLen > 0) { packet[CONN_FLAG_BYTE_OFFSET] |= PASSWORD_FLAG; byteutil_writeUTF(&iterator, mqttOptions->password, (uint16_t)passwordLen); + if (trace_log != NULL) + { + (void)STRING_sprintf(connect_payload_trace, " | PWD: XXXX"); + } } // TODO: Get the rest of the flags + if (trace_log != NULL) + { + (void)STRING_sprintf(connect_payload_trace, " | CLEAN: %s", mqttOptions->useCleanSession ? "1" : "0"); + } if (mqttOptions->useCleanSession) { packet[CONN_FLAG_BYTE_OFFSET] |= CLEAN_SESSION_FLAG; } + if (trace_log != NULL) + { + (void)STRING_sprintf(trace_log, " %zu", packet[CONN_FLAG_BYTE_OFFSET]); + (void)STRING_concat_with_STRING(trace_log, connect_payload_trace); + STRING_delete(connect_payload_trace); + } result = 0; } } @@ -488,7 +555,7 @@ result = __LINE__; break; } - } while ( (encodeByte & NEXT_128_CHUNK) != 0); + } while ((encodeByte & NEXT_128_CHUNK) != 0); if (totalLen > 0) { @@ -527,7 +594,7 @@ MQTTCODEC_HANDLE mqtt_codec_create(ON_PACKET_COMPLETE_CALLBACK packetComplete, void* callbackCtx) { MQTTCODEC_HANDLE result; - result = malloc(sizeof(MQTTCODEC_INSTANCE) ); + result = malloc(sizeof(MQTTCODEC_INSTANCE)); /* Codes_SRS_MQTT_CODEC_07_001: [If a failure is encountered then mqtt_codec_create shall return NULL.] */ if (result != NULL) { @@ -557,7 +624,7 @@ } } -BUFFER_HANDLE mqtt_codec_connect(const MQTT_CLIENT_OPTIONS* mqttOptions) +BUFFER_HANDLE mqtt_codec_connect(const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log) { BUFFER_HANDLE result; /* Codes_SRS_MQTT_CODEC_07_008: [If the parameters mqttOptions is NULL then mqtt_codec_connect shall return a null value.] */ @@ -571,8 +638,13 @@ result = BUFFER_new(); if (result != NULL) { + STRING_HANDLE varible_header_log = NULL; + if (trace_log != NULL) + { + varible_header_log = STRING_new(); + } // Add Variable Header Information - if (constructConnectVariableHeader(result, mqttOptions) != 0) + if (constructConnectVariableHeader(result, mqttOptions, varible_header_log) != 0) { /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */ BUFFER_delete(result); @@ -580,7 +652,7 @@ } else { - if (constructConnPayload(result, mqttOptions) != 0) + if (constructConnPayload(result, mqttOptions, varible_header_log) != 0) { /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */ BUFFER_delete(result); @@ -588,12 +660,27 @@ } else { + if (trace_log != NULL) + { + (void)STRING_copy(trace_log, "CONNECT"); + } if (constructFixedHeader(result, CONNECT_TYPE, 0) != 0) { /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */ BUFFER_delete(result); result = NULL; } + else + { + if (trace_log != NULL) + { + (void)STRING_concat_with_STRING(trace_log, varible_header_log); + } + } + } + if (varible_header_log != NULL) + { + STRING_delete(varible_header_log); } } } @@ -632,7 +719,7 @@ return result; } -BUFFER_HANDLE mqtt_codec_publish(QOS_VALUE qosValue, bool duplicateMsg, bool serverRetain, uint16_t packetId, const char* topicName, const uint8_t* msgBuffer, size_t buffLen) +BUFFER_HANDLE mqtt_codec_publish(QOS_VALUE qosValue, bool duplicateMsg, bool serverRetain, uint16_t packetId, const char* topicName, const uint8_t* msgBuffer, size_t buffLen, STRING_HANDLE trace_log) { BUFFER_HANDLE result; /* Codes_SRS_MQTT_CODEC_07_005: [If the parameters topicName is NULL then mqtt_codec_publish shall return NULL.] */ @@ -648,7 +735,7 @@ } else { - PUBLISH_HEADER_INFO publishInfo = { 0 }; + PUBLISH_HEADER_INFO publishInfo ={ 0 }; publishInfo.topicName = topicName; publishInfo.packetId = packetId; publishInfo.qualityOfServiceValue = qosValue; @@ -672,7 +759,15 @@ result = BUFFER_new(); if (result != NULL) { - if (constructPublishVariableHeader(result, &publishInfo) != 0) + STRING_HANDLE varible_header_log = NULL; + if (trace_log != NULL) + { + varible_header_log = STRING_construct_sprintf(" | IS_DUP: %s | RETAIN: %d | QOS: %s", duplicateMsg ? TRUE_CONST : FALSE_CONST, + serverRetain ? 1 : 0, + retrieve_qos_value(publishInfo.qualityOfServiceValue) ); + } + + if (constructPublishVariableHeader(result, &publishInfo, varible_header_log) != 0) { /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */ BUFFER_delete(result); @@ -703,20 +798,39 @@ iterator += payloadOffset; // Write Message memcpy(iterator, msgBuffer, buffLen); + if (trace_log) + { + STRING_sprintf(varible_header_log, " | PAYLOAD_LEN: %zu", buffLen); + } } } } if (result != NULL) { + if (trace_log != NULL) + { + (void)STRING_copy(trace_log, "PUBLISH"); + } if (constructFixedHeader(result, PUBLISH_TYPE, headerFlags) != 0) { /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */ BUFFER_delete(result); result = NULL; } + else + { + if (trace_log != NULL) + { + (void)STRING_concat_with_STRING(trace_log, varible_header_log); + } + } } } + if (varible_header_log != NULL) + { + STRING_delete(varible_header_log); + } } } return result; @@ -785,7 +899,7 @@ return result; } -BUFFER_HANDLE mqtt_codec_subscribe(uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count) +BUFFER_HANDLE mqtt_codec_subscribe(uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count, STRING_HANDLE trace_log) { BUFFER_HANDLE result; /* Codes_SRS_MQTT_CODEC_07_023: [If the parameters subscribeList is NULL or if count is 0 then mqtt_codec_subscribe shall return NULL.] */ @@ -807,8 +921,13 @@ } else { + STRING_HANDLE sub_trace = NULL; + if (trace_log != NULL) + { + sub_trace = STRING_construct_sprintf(" | PACKET_ID: %"PRIu16, packetId); + } /* Codes_SRS_MQTT_CODEC_07_024: [mqtt_codec_subscribe shall iterate through count items in the subscribeList.] */ - if (addListItemsToSubscribePacket(result, subscribeList, count) != 0) + if (addListItemsToSubscribePacket(result, subscribeList, count, sub_trace) != 0) { /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */ BUFFER_delete(result); @@ -816,12 +935,28 @@ } else { + + if (trace_log != NULL) + { + STRING_concat(trace_log, "SUBSCRIBE"); + } if (constructFixedHeader(result, SUBSCRIBE_TYPE, SUBSCRIBE_FIXED_HEADER_FLAG) != 0) { /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */ BUFFER_delete(result); result = NULL; } + else + { + if (trace_log != NULL) + { + (void)STRING_concat_with_STRING(trace_log, sub_trace); + } + } + } + if (sub_trace != NULL) + { + STRING_delete(sub_trace); } } } @@ -829,7 +964,7 @@ return result; } -BUFFER_HANDLE mqtt_codec_unsubscribe(uint16_t packetId, const char** unsubscribeList, size_t count) +BUFFER_HANDLE mqtt_codec_unsubscribe(uint16_t packetId, const char** unsubscribeList, size_t count, STRING_HANDLE trace_log) { BUFFER_HANDLE result; /* Codes_SRS_MQTT_CODEC_07_027: [If the parameters unsubscribeList is NULL or if count is 0 then mqtt_codec_unsubscribe shall return NULL.] */ @@ -851,8 +986,13 @@ } else { + STRING_HANDLE unsub_trace = NULL; + if (trace_log != NULL) + { + unsub_trace = STRING_construct_sprintf(" | PACKET_ID: %"PRIu16, packetId); + } /* Codes_SRS_MQTT_CODEC_07_028: [mqtt_codec_unsubscribe shall iterate through count items in the unsubscribeList.] */ - if (addListItemsToUnsubscribePacket(result, unsubscribeList, count) != 0) + if (addListItemsToUnsubscribePacket(result, unsubscribeList, count, unsub_trace) != 0) { /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */ BUFFER_delete(result); @@ -860,12 +1000,27 @@ } else { + if (trace_log != NULL) + { + (void)STRING_copy(trace_log, "UNSUBSCRIBE"); + } if (constructFixedHeader(result, UNSUBSCRIBE_TYPE, UNSUBSCRIBE_FIXED_HEADER_FLAG) != 0) { /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */ BUFFER_delete(result); result = NULL; } + else + { + if (trace_log != NULL) + { + (void)STRING_concat_with_STRING(trace_log, unsub_trace); + } + } + } + if (unsub_trace != NULL) + { + STRING_delete(unsub_trace); } } }