Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: message_queue.c
- Revision:
- 56:8704100b3b54
- Parent:
- 53:e21e1e88460f
--- a/message_queue.c Thu Jul 12 18:08:04 2018 -0700 +++ b/message_queue.c Tue Sep 11 11:11:47 2018 -0700 @@ -6,7 +6,7 @@ #include "azure_c_shared_utility/optimize_size.h" #include "azure_c_shared_utility/crt_abstractions.h" #include "azure_c_shared_utility/gballoc.h" -#include "azure_c_shared_utility/agenttime.h" +#include "azure_c_shared_utility/agenttime.h" #include "azure_c_shared_utility/xlogging.h" #include "azure_c_shared_utility/singlylinkedlist.h" @@ -79,7 +79,7 @@ { int result; MESSAGE_QUEUE_ITEM* mq_item; - + mq_item = (MESSAGE_QUEUE_ITEM*)singlylinkedlist_item_get_value(list_item); if (singlylinkedlist_remove(message_queue->in_progress, list_item)) @@ -109,7 +109,7 @@ { LogError("failed removing message from list (%p)", list); } - + // Codes_SRS_MESSAGE_QUEUE_09_049: [Otherwise `mq_item->on_message_processing_completed_callback` shall be invoked passing `mq_item->message`, `result`, `reason` and `mq_item->user_context`] fire_message_callback(mq_item, result, reason); @@ -127,7 +127,7 @@ else { LIST_ITEM_HANDLE list_item; - + if ((list_item = singlylinkedlist_find(message_queue->in_progress, find_item_by_message_ptr, message)) == NULL) { // Codes_SRS_MESSAGE_QUEUE_09_044: [If `message` is not present in `message_queue->in_progress`, it shall be ignored] @@ -136,7 +136,7 @@ else { MESSAGE_QUEUE_ITEM* mq_item = (MESSAGE_QUEUE_ITEM*)singlylinkedlist_item_get_value(list_item); - + // Codes_SRS_MESSAGE_QUEUE_09_047: [If `result` is MESSAGE_QUEUE_RETRYABLE_ERROR and `mq_item->number_of_attempts` is less than or equal `message_queue->max_retry_count`, the `message` shall be moved to `message_queue->pending` to be re-sent] // Codes_SRS_MESSAGE_QUEUE_09_048: [If `result` is MESSAGE_QUEUE_RETRYABLE_ERROR and `mq_item->number_of_attempts` is greater than `message_queue->max_retry_count`, result shall be changed to MESSAGE_QUEUE_ERROR] if (!should_retry_sending(message_queue, mq_item, result) || retry_sending_message(message_queue, list_item) != RESULT_OK) @@ -309,8 +309,8 @@ LogError("invalid argument (name=%p, value=%p)", name, value); result = NULL; } - else if (strcmp(SAVED_OPTION_MAX_ENQUEUE_TIME_SECS, name) == 0 || - strcmp(SAVED_OPTION_MAX_PROCESSING_TIME_SECS, name) == 0 || + else if (strcmp(SAVED_OPTION_MAX_ENQUEUE_TIME_SECS, name) == 0 || + strcmp(SAVED_OPTION_MAX_PROCESSING_TIME_SECS, name) == 0 || strcmp(SAVED_OPTION_MAX_RETRY_COUNT, name) == 0) { if ((result = malloc(sizeof(size_t))) == NULL) @@ -337,8 +337,8 @@ { LogError("invalid argument (name=%p, value=%p)", name, value); } - else if (strcmp(SAVED_OPTION_MAX_ENQUEUE_TIME_SECS, name) == 0 || - strcmp(SAVED_OPTION_MAX_PROCESSING_TIME_SECS, name) == 0 || + else if (strcmp(SAVED_OPTION_MAX_ENQUEUE_TIME_SECS, name) == 0 || + strcmp(SAVED_OPTION_MAX_PROCESSING_TIME_SECS, name) == 0 || strcmp(SAVED_OPTION_MAX_RETRY_COUNT, name) == 0) { free((void*)value); @@ -360,18 +360,18 @@ { LIST_ITEM_HANDLE list_item; - // Codes_SRS_MESSAGE_QUEUE_09_027: [Each `mq_item` in `message_queue->pending` and `message_queue->in_progress` lists shall be removed] + // Codes_SRS_MESSAGE_QUEUE_09_027: [Each `mq_item` in `message_queue->pending` and `message_queue->in_progress` lists shall be removed] while ((list_item = singlylinkedlist_get_head_item(message_queue->in_progress)) != NULL) { // Codes_SRS_MESSAGE_QUEUE_09_028: [`message_queue->on_message_processing_completed_callback` shall be invoked with MESSAGE_QUEUE_CANCELLED for each `mq_item` removed] - // Codes_SRS_MESSAGE_QUEUE_09_029: [Each `mq_item` shall be freed] + // Codes_SRS_MESSAGE_QUEUE_09_029: [Each `mq_item` shall be freed] dequeue_message_and_fire_callback(message_queue->in_progress, list_item, MESSAGE_QUEUE_CANCELLED, NULL); } while ((list_item = singlylinkedlist_get_head_item(message_queue->pending)) != NULL) { // Codes_SRS_MESSAGE_QUEUE_09_028: [`message_queue->on_message_processing_completed_callback` shall be invoked with MESSAGE_QUEUE_CANCELLED for each `mq_item` removed] - // Codes_SRS_MESSAGE_QUEUE_09_029: [Each `mq_item` shall be freed] + // Codes_SRS_MESSAGE_QUEUE_09_029: [Each `mq_item` shall be freed] dequeue_message_and_fire_callback(message_queue->pending, list_item, MESSAGE_QUEUE_CANCELLED, NULL); } } @@ -468,7 +468,7 @@ } } - singlylinkedlist_destroy(temp_list); + singlylinkedlist_destroy(temp_list); } } @@ -493,7 +493,7 @@ { singlylinkedlist_destroy(message_queue->in_progress); } - + free(message_queue); } } @@ -810,4 +810,4 @@ // Codes_SRS_MESSAGE_QUEUE_09_068: [If no failures occur, message_queue_retrieve_options shall return the OPTIONHANDLER_HANDLE instance] return result; -} \ No newline at end of file +}