Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransport_amqp_twin_messenger.c
- Revision:
- 56:8704100b3b54
- Parent:
- 54:830550fef7ea
--- a/iothubtransport_amqp_twin_messenger.c Thu Jul 12 18:08:04 2018 -0700 +++ b/iothubtransport_amqp_twin_messenger.c Tue Sep 11 11:11:47 2018 -0700 @@ -7,7 +7,7 @@ #include "azure_c_shared_utility/optimize_size.h" #include "azure_c_shared_utility/crt_abstractions.h" #include "azure_c_shared_utility/gballoc.h" -#include "azure_c_shared_utility/agenttime.h" +#include "azure_c_shared_utility/agenttime.h" #include "azure_c_shared_utility/xlogging.h" #include "azure_c_shared_utility/uniqueid.h" #include "azure_c_shared_utility/singlylinkedlist.h" @@ -27,56 +27,56 @@ #define RESULT_OK 0 #define INDEFINITE_TIME ((time_t)(-1)) -#define CLIENT_VERSION_PROPERTY_NAME "com.microsoft:client-version" +#define CLIENT_VERSION_PROPERTY_NAME "com.microsoft:client-version" #define UNIQUE_ID_BUFFER_SIZE 37 #define EMPTY_TWIN_BODY_DATA ((const unsigned char*)" ") #define EMPTY_TWIN_BODY_SIZE 1 -#define TWIN_MESSAGE_PROPERTY_OPERATION "operation" -#define TWIN_MESSAGE_PROPERTY_RESOURCE "resource" -#define TWIN_MESSAGE_PROPERTY_VERSION "version" -#define TWIN_MESSAGE_PROPERTY_STATUS "status" +#define TWIN_MESSAGE_PROPERTY_OPERATION "operation" +#define TWIN_MESSAGE_PROPERTY_RESOURCE "resource" +#define TWIN_MESSAGE_PROPERTY_VERSION "version" +#define TWIN_MESSAGE_PROPERTY_STATUS "status" -#define TWIN_RESOURCE_DESIRED "/notifications/twin/properties/desired" -#define TWIN_RESOURCE_REPORTED "/properties/reported" +#define TWIN_RESOURCE_DESIRED "/notifications/twin/properties/desired" +#define TWIN_RESOURCE_REPORTED "/properties/reported" -#define TWIN_CORRELATION_ID_PROPERTY_NAME "com.microsoft:channel-correlation-id" -#define TWIN_API_VERSION_PROPERTY_NAME "com.microsoft:api-version" -#define TWIN_CORRELATION_ID_PROPERTY_FORMAT "twin:%s" -#define TWIN_API_VERSION_NUMBER "2016-11-14" +#define TWIN_CORRELATION_ID_PROPERTY_NAME "com.microsoft:channel-correlation-id" +#define TWIN_API_VERSION_PROPERTY_NAME "com.microsoft:api-version" +#define TWIN_CORRELATION_ID_PROPERTY_FORMAT "twin:%s" +#define TWIN_API_VERSION_NUMBER "2016-11-14" -#define DEFAULT_MAX_TWIN_SUBSCRIPTION_ERROR_COUNT 3 -#define DEFAULT_TWIN_OPERATION_TIMEOUT_SECS 300.0 +#define DEFAULT_MAX_TWIN_SUBSCRIPTION_ERROR_COUNT 3 +#define DEFAULT_TWIN_OPERATION_TIMEOUT_SECS 300.0 -static char* DEFAULT_TWIN_SEND_LINK_SOURCE_NAME = "twin"; -static char* DEFAULT_TWIN_RECEIVE_LINK_TARGET_NAME = "twin"; +static char* DEFAULT_TWIN_SEND_LINK_SOURCE_NAME = "twin"; +static char* DEFAULT_TWIN_RECEIVE_LINK_TARGET_NAME = "twin"; -static const char* TWIN_OPERATION_PATCH = "PATCH"; -static const char* TWIN_OPERATION_GET = "GET"; -static const char* TWIN_OPERATION_PUT = "PUT"; -static const char* TWIN_OPERATION_DELETE = "DELETE"; +static const char* TWIN_OPERATION_PATCH = "PATCH"; +static const char* TWIN_OPERATION_GET = "GET"; +static const char* TWIN_OPERATION_PUT = "PUT"; +static const char* TWIN_OPERATION_DELETE = "DELETE"; #define TWIN_OPERATION_TYPE_STRINGS \ - TWIN_OPERATION_TYPE_PATCH, \ - TWIN_OPERATION_TYPE_GET, \ - TWIN_OPERATION_TYPE_PUT, \ - TWIN_OPERATION_TYPE_DELETE + TWIN_OPERATION_TYPE_PATCH, \ + TWIN_OPERATION_TYPE_GET, \ + TWIN_OPERATION_TYPE_PUT, \ + TWIN_OPERATION_TYPE_DELETE DEFINE_LOCAL_ENUM(TWIN_OPERATION_TYPE, TWIN_OPERATION_TYPE_STRINGS); #define TWIN_SUBSCRIPTION_STATE_STRINGS \ - TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED, \ - TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES, \ - TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES, \ - TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES, \ - TWIN_SUBSCRIPTION_STATE_SUBSCRIBING, \ - TWIN_SUBSCRIPTION_STATE_SUBSCRIBED, \ - TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE, \ - TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBING + TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED, \ + TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES, \ + TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES, \ + TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES, \ + TWIN_SUBSCRIPTION_STATE_SUBSCRIBING, \ + TWIN_SUBSCRIPTION_STATE_SUBSCRIBED, \ + TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE, \ + TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBING -// Suppress unused function warning for TWIN_SUBSCRIPTION_STATEstrings +// Suppress unused function warning for TWIN_SUBSCRIPTION_STATEstrings #ifdef __APPLE__ #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wunused-function" @@ -88,45 +88,45 @@ typedef struct TWIN_MESSENGER_INSTANCE_TAG { - char* client_version; - char* device_id; - char* module_id; - char* iothub_host_fqdn; + char* client_version; + char* device_id; + char* module_id; + char* iothub_host_fqdn; - TWIN_MESSENGER_STATE state; + TWIN_MESSENGER_STATE state; + + SINGLYLINKEDLIST_HANDLE pending_patches; + SINGLYLINKEDLIST_HANDLE operations; - SINGLYLINKEDLIST_HANDLE pending_patches; - SINGLYLINKEDLIST_HANDLE operations; - - TWIN_MESSENGER_STATE_CHANGED_CALLBACK on_state_changed_callback; - void* on_state_changed_context; + TWIN_MESSENGER_STATE_CHANGED_CALLBACK on_state_changed_callback; + void* on_state_changed_context; - TWIN_SUBSCRIPTION_STATE subscription_state; - size_t subscription_error_count; - TWIN_STATE_UPDATE_CALLBACK on_message_received_callback; - void* on_message_received_context; + TWIN_SUBSCRIPTION_STATE subscription_state; + size_t subscription_error_count; + TWIN_STATE_UPDATE_CALLBACK on_message_received_callback; + void* on_message_received_context; - AMQP_MESSENGER_HANDLE amqp_msgr; - AMQP_MESSENGER_STATE amqp_msgr_state; - bool amqp_msgr_is_subscribed; + AMQP_MESSENGER_HANDLE amqp_msgr; + AMQP_MESSENGER_STATE amqp_msgr_state; + bool amqp_msgr_is_subscribed; } TWIN_MESSENGER_INSTANCE; typedef struct TWIN_PATCH_OPERATION_CONTEXT_TAG { - CONSTBUFFER_HANDLE data; - TWIN_MESSENGER_REPORT_STATE_COMPLETE_CALLBACK on_report_state_complete_callback; - const void* on_report_state_complete_context; - time_t time_enqueued; + CONSTBUFFER_HANDLE data; + TWIN_MESSENGER_REPORT_STATE_COMPLETE_CALLBACK on_report_state_complete_callback; + const void* on_report_state_complete_context; + time_t time_enqueued; } TWIN_PATCH_OPERATION_CONTEXT; typedef struct TWIN_OPERATION_CONTEXT_TAG { - TWIN_OPERATION_TYPE type; - TWIN_MESSENGER_INSTANCE* msgr; - char* correlation_id; - TWIN_MESSENGER_REPORT_STATE_COMPLETE_CALLBACK on_report_state_complete_callback; - const void* on_report_state_complete_context; - time_t time_sent; + TWIN_OPERATION_TYPE type; + TWIN_MESSENGER_INSTANCE* msgr; + char* correlation_id; + TWIN_MESSENGER_REPORT_STATE_COMPLETE_CALLBACK on_report_state_complete_callback; + const void* on_report_state_complete_context; + time_t time_sent; } TWIN_OPERATION_CONTEXT; @@ -134,191 +134,191 @@ static void update_state(TWIN_MESSENGER_INSTANCE* twin_msgr, TWIN_MESSENGER_STATE new_state) { - if (new_state != twin_msgr->state) - { - TWIN_MESSENGER_STATE previous_state = twin_msgr->state; - twin_msgr->state = new_state; + if (new_state != twin_msgr->state) + { + TWIN_MESSENGER_STATE previous_state = twin_msgr->state; + twin_msgr->state = new_state; - if (twin_msgr->on_state_changed_callback != NULL) - { - twin_msgr->on_state_changed_callback(twin_msgr->on_state_changed_context, previous_state, new_state); - } - } + if (twin_msgr->on_state_changed_callback != NULL) + { + twin_msgr->on_state_changed_callback(twin_msgr->on_state_changed_context, previous_state, new_state); + } + } } //---------- AMQP Helper Functions ----------// static int set_message_correlation_id(MESSAGE_HANDLE message, const char* value) { - int result; - PROPERTIES_HANDLE properties = NULL; + int result; + PROPERTIES_HANDLE properties = NULL; - if (message_get_properties(message, &properties) != 0) - { - LogError("Failed getting the AMQP message properties"); - result = __FAILURE__; - } - else if (properties == NULL && (properties = properties_create()) == NULL) - { - LogError("Failed creating properties for AMQP message"); - result = __FAILURE__; - } - else - { - AMQP_VALUE amqp_value; + if (message_get_properties(message, &properties) != 0) + { + LogError("Failed getting the AMQP message properties"); + result = __FAILURE__; + } + else if (properties == NULL && (properties = properties_create()) == NULL) + { + LogError("Failed creating properties for AMQP message"); + result = __FAILURE__; + } + else + { + AMQP_VALUE amqp_value; - if ((amqp_value = amqpvalue_create_string(value)) == NULL) - { - LogError("Failed creating AMQP value for correlation-id"); - result = __FAILURE__; - } - else - { - if (properties_set_correlation_id(properties, amqp_value) != RESULT_OK) - { - LogError("Failed setting the correlation id"); - result = __FAILURE__; - } - else if (message_set_properties(message, properties) != RESULT_OK) - { - LogError("Failed setting the AMQP message properties"); - result = __FAILURE__; - } - else - { - result = RESULT_OK; - } + if ((amqp_value = amqpvalue_create_string(value)) == NULL) + { + LogError("Failed creating AMQP value for correlation-id"); + result = __FAILURE__; + } + else + { + if (properties_set_correlation_id(properties, amqp_value) != RESULT_OK) + { + LogError("Failed setting the correlation id"); + result = __FAILURE__; + } + else if (message_set_properties(message, properties) != RESULT_OK) + { + LogError("Failed setting the AMQP message properties"); + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } - amqpvalue_destroy(amqp_value); - } + amqpvalue_destroy(amqp_value); + } - properties_destroy(properties); - } + properties_destroy(properties); + } - return result; + return result; } static int get_message_correlation_id(MESSAGE_HANDLE message, char** correlation_id) { - int result; + int result; - PROPERTIES_HANDLE properties; - AMQP_VALUE amqp_value; + PROPERTIES_HANDLE properties; + AMQP_VALUE amqp_value; - if (message_get_properties(message, &properties) != 0) - { - LogError("Failed getting AMQP message properties"); - result = __FAILURE__; - } - else if (properties == NULL) - { - *correlation_id = NULL; - result = RESULT_OK; - } - else - { - if (properties_get_correlation_id(properties, &amqp_value) != 0 || amqp_value == NULL) - { - *correlation_id = NULL; - result = RESULT_OK; - } - else - { - const char* value; + if (message_get_properties(message, &properties) != 0) + { + LogError("Failed getting AMQP message properties"); + result = __FAILURE__; + } + else if (properties == NULL) + { + *correlation_id = NULL; + result = RESULT_OK; + } + else + { + if (properties_get_correlation_id(properties, &amqp_value) != 0 || amqp_value == NULL) + { + *correlation_id = NULL; + result = RESULT_OK; + } + else + { + const char* value; - if (amqpvalue_get_string(amqp_value, &value) != 0) - { - LogError("Failed retrieving string from AMQP value"); - result = __FAILURE__; - } - else if (mallocAndStrcpy_s(correlation_id, value) != 0) - { - LogError("Failed cloning correlation-id"); - result = __FAILURE__; - } - else - { - result = RESULT_OK; - } - } + if (amqpvalue_get_string(amqp_value, &value) != 0) + { + LogError("Failed retrieving string from AMQP value"); + result = __FAILURE__; + } + else if (mallocAndStrcpy_s(correlation_id, value) != 0) + { + LogError("Failed cloning correlation-id"); + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } + } - properties_destroy(properties); - } + properties_destroy(properties); + } - return result; + return result; } static int add_map_item(AMQP_VALUE map, const char* name, const char* value) { - int result; - AMQP_VALUE amqp_value_name; + int result; + AMQP_VALUE amqp_value_name; + + if ((amqp_value_name = amqpvalue_create_symbol(name)) == NULL) + { + LogError("Failed creating AMQP_VALUE for name"); + result = __FAILURE__; + } + else + { + AMQP_VALUE amqp_value_value = NULL; - if ((amqp_value_name = amqpvalue_create_symbol(name)) == NULL) - { - LogError("Failed creating AMQP_VALUE for name"); - result = __FAILURE__; - } - else - { - AMQP_VALUE amqp_value_value = NULL; - - if (value == NULL && (amqp_value_value = amqpvalue_create_null()) == NULL) - { - LogError("Failed creating AMQP_VALUE for NULL value"); - result = __FAILURE__; - } - else if (value != NULL && (amqp_value_value = amqpvalue_create_string(value)) == NULL) - { - LogError("Failed creating AMQP_VALUE for value"); - result = __FAILURE__; - } - else - { - if (amqpvalue_set_map_value(map, amqp_value_name, amqp_value_value) != 0) - { - LogError("Failed adding key/value pair to map"); - result = __FAILURE__; - } - else - { - result = RESULT_OK; - } + if (value == NULL && (amqp_value_value = amqpvalue_create_null()) == NULL) + { + LogError("Failed creating AMQP_VALUE for NULL value"); + result = __FAILURE__; + } + else if (value != NULL && (amqp_value_value = amqpvalue_create_string(value)) == NULL) + { + LogError("Failed creating AMQP_VALUE for value"); + result = __FAILURE__; + } + else + { + if (amqpvalue_set_map_value(map, amqp_value_name, amqp_value_value) != 0) + { + LogError("Failed adding key/value pair to map"); + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } - amqpvalue_destroy(amqp_value_value); - } + amqpvalue_destroy(amqp_value_value); + } - amqpvalue_destroy(amqp_value_name); - } + amqpvalue_destroy(amqp_value_name); + } - return result; + return result; } static int add_amqp_message_annotation(MESSAGE_HANDLE message, AMQP_VALUE msg_annotations_map) { - int result; - AMQP_VALUE msg_annotations; + int result; + AMQP_VALUE msg_annotations; - if ((msg_annotations = amqpvalue_create_message_annotations(msg_annotations_map)) == NULL) - { - LogError("Failed creating new AMQP message annotations"); - result = __FAILURE__; - } - else - { - if (message_set_message_annotations(message, (annotations)msg_annotations) != 0) - { - LogError("Failed setting AMQP message annotations"); - result = __FAILURE__; - } - else - { - result = RESULT_OK; - } + if ((msg_annotations = amqpvalue_create_message_annotations(msg_annotations_map)) == NULL) + { + LogError("Failed creating new AMQP message annotations"); + result = __FAILURE__; + } + else + { + if (message_set_message_annotations(message, (annotations)msg_annotations) != 0) + { + LogError("Failed setting AMQP message annotations"); + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } - annotations_destroy(msg_annotations); - } + annotations_destroy(msg_annotations); + } - return result; + return result; } @@ -326,650 +326,650 @@ static char* generate_unique_id() { - char* result; + char* result; - if ((result = (char*)malloc(sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1)) == NULL) - { - LogError("Failed generating an unique tag (malloc failed)"); - } - else - { - memset(result, 0, sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1); + if ((result = (char*)malloc(sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1)) == NULL) + { + LogError("Failed generating an unique tag (malloc failed)"); + } + else + { + memset(result, 0, sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1); - if (UniqueId_Generate(result, UNIQUE_ID_BUFFER_SIZE) != UNIQUEID_OK) - { - LogError("Failed generating an unique tag (UniqueId_Generate failed)"); - free(result); - result = NULL; - } - } + if (UniqueId_Generate(result, UNIQUE_ID_BUFFER_SIZE) != UNIQUEID_OK) + { + LogError("Failed generating an unique tag (UniqueId_Generate failed)"); + free(result); + result = NULL; + } + } - return result; + return result; } static char* generate_twin_correlation_id() { - char* result; - char* unique_id; + char* result; + char* unique_id; - if ((unique_id = generate_unique_id()) == NULL) - { - LogError("Failed generating unique ID for correlation-id"); - result = NULL; - } - else - { - if ((result = (char*)malloc(strlen(TWIN_CORRELATION_ID_PROPERTY_FORMAT) + strlen(unique_id) + 1)) == NULL) - { - LogError("Failed allocating correlation-id"); - result = NULL; - } - else - { - (void)sprintf(result, TWIN_CORRELATION_ID_PROPERTY_FORMAT, unique_id); - } + if ((unique_id = generate_unique_id()) == NULL) + { + LogError("Failed generating unique ID for correlation-id"); + result = NULL; + } + else + { + if ((result = (char*)malloc(strlen(TWIN_CORRELATION_ID_PROPERTY_FORMAT) + strlen(unique_id) + 1)) == NULL) + { + LogError("Failed allocating correlation-id"); + result = NULL; + } + else + { + (void)sprintf(result, TWIN_CORRELATION_ID_PROPERTY_FORMAT, unique_id); + } - free(unique_id); - } + free(unique_id); + } - return result; + return result; } static TWIN_OPERATION_CONTEXT* create_twin_operation_context(TWIN_MESSENGER_INSTANCE* twin_msgr, TWIN_OPERATION_TYPE type) { - TWIN_OPERATION_CONTEXT* result; + TWIN_OPERATION_CONTEXT* result; - if ((result = (TWIN_OPERATION_CONTEXT*)malloc(sizeof(TWIN_OPERATION_CONTEXT))) == NULL) - { - LogError("Failed creating context for %s (%s)", ENUM_TO_STRING(TWIN_OPERATION_TYPE, type), twin_msgr->device_id); - } - else - { - memset(result, 0, sizeof(TWIN_OPERATION_CONTEXT)); + if ((result = (TWIN_OPERATION_CONTEXT*)malloc(sizeof(TWIN_OPERATION_CONTEXT))) == NULL) + { + LogError("Failed creating context for %s (%s)", ENUM_TO_STRING(TWIN_OPERATION_TYPE, type), twin_msgr->device_id); + } + else + { + memset(result, 0, sizeof(TWIN_OPERATION_CONTEXT)); - if ((result->correlation_id = generate_unique_id()) == NULL) - { - LogError("Failed setting context correlation-id (%s, %s)", ENUM_TO_STRING(TWIN_OPERATION_TYPE, type), twin_msgr->device_id); - free(result); - result = NULL; - } - else - { - result->type = type; - result->msgr = twin_msgr; - } - } + if ((result->correlation_id = generate_unique_id()) == NULL) + { + LogError("Failed setting context correlation-id (%s, %s)", ENUM_TO_STRING(TWIN_OPERATION_TYPE, type), twin_msgr->device_id); + free(result); + result = NULL; + } + else + { + result->type = type; + result->msgr = twin_msgr; + } + } - return result; + return result; } static bool find_twin_operation_by_correlation_id(LIST_ITEM_HANDLE list_item, const void* match_context) { - TWIN_OPERATION_CONTEXT* twin_op_ctx = (TWIN_OPERATION_CONTEXT*)singlylinkedlist_item_get_value(list_item); - return (strcmp(twin_op_ctx->correlation_id, (const char*)match_context) == 0); + TWIN_OPERATION_CONTEXT* twin_op_ctx = (TWIN_OPERATION_CONTEXT*)singlylinkedlist_item_get_value(list_item); + return (strcmp(twin_op_ctx->correlation_id, (const char*)match_context) == 0); } static bool find_twin_operation_by_type(LIST_ITEM_HANDLE list_item, const void* match_context) { - TWIN_OPERATION_CONTEXT* twin_op_ctx = (TWIN_OPERATION_CONTEXT*)singlylinkedlist_item_get_value(list_item); - return (twin_op_ctx->type == *(TWIN_OPERATION_TYPE*)match_context); + TWIN_OPERATION_CONTEXT* twin_op_ctx = (TWIN_OPERATION_CONTEXT*)singlylinkedlist_item_get_value(list_item); + return (twin_op_ctx->type == *(TWIN_OPERATION_TYPE*)match_context); } static void destroy_twin_operation_context(TWIN_OPERATION_CONTEXT* op_ctx) { - free(op_ctx->correlation_id); - free(op_ctx); + free(op_ctx->correlation_id); + free(op_ctx); } static int add_twin_operation_context_to_queue(TWIN_OPERATION_CONTEXT* twin_op_ctx) { - int result; + int result; - if (singlylinkedlist_add(twin_op_ctx->msgr->operations, (const void*)twin_op_ctx) == NULL) - { - LogError("Failed adding TWIN operation context to queue (%s, %s)", ENUM_TO_STRING(TWIN_OPERATION_TYPE, twin_op_ctx->type), twin_op_ctx->correlation_id); - result = __FAILURE__; - } - else - { - result = RESULT_OK; - } + if (singlylinkedlist_add(twin_op_ctx->msgr->operations, (const void*)twin_op_ctx) == NULL) + { + LogError("Failed adding TWIN operation context to queue (%s, %s)", ENUM_TO_STRING(TWIN_OPERATION_TYPE, twin_op_ctx->type), twin_op_ctx->correlation_id); + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } - return result; + return result; } static int remove_twin_operation_context_from_queue(TWIN_OPERATION_CONTEXT* twin_op_ctx) { - int result; - LIST_ITEM_HANDLE list_item; + int result; + LIST_ITEM_HANDLE list_item; - if ((list_item = singlylinkedlist_find(twin_op_ctx->msgr->operations, find_twin_operation_by_correlation_id, (const void*)twin_op_ctx->correlation_id)) == NULL) - { - result = RESULT_OK; - } - else if (singlylinkedlist_remove(twin_op_ctx->msgr->operations, list_item) != 0) - { - LogError("Failed removing TWIN operation context from queue (%s, %s, %s)", - twin_op_ctx->msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, twin_op_ctx->type), twin_op_ctx->correlation_id); - result = __FAILURE__; - } - else - { - result = RESULT_OK; - } + if ((list_item = singlylinkedlist_find(twin_op_ctx->msgr->operations, find_twin_operation_by_correlation_id, (const void*)twin_op_ctx->correlation_id)) == NULL) + { + result = RESULT_OK; + } + else if (singlylinkedlist_remove(twin_op_ctx->msgr->operations, list_item) != 0) + { + LogError("Failed removing TWIN operation context from queue (%s, %s, %s)", + twin_op_ctx->msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, twin_op_ctx->type), twin_op_ctx->correlation_id); + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } - return result; + return result; } //---------- TWIN <-> AMQP Translation Functions ----------// static int parse_incoming_twin_message(MESSAGE_HANDLE message, - char** correlation_id, - bool* has_version, int64_t* version, - bool* has_status_code, int* status_code, - bool* has_twin_report, BINARY_DATA* twin_report) + char** correlation_id, + bool* has_version, int64_t* version, + bool* has_status_code, int* status_code, + bool* has_twin_report, BINARY_DATA* twin_report) { - int result; + int result; - if (get_message_correlation_id(message, correlation_id) != 0) - { - LogError("Failed retrieving correlation ID from received TWIN message."); - result = __FAILURE__; - } - else - { - annotations message_annotations; + if (get_message_correlation_id(message, correlation_id) != 0) + { + LogError("Failed retrieving correlation ID from received TWIN message."); + result = __FAILURE__; + } + else + { + annotations message_annotations; - if (message_get_message_annotations(message, &message_annotations) != 0) - { - LogError("Failed getting TWIN message annotations"); - result = __FAILURE__; - } - else - { - result = RESULT_OK; + if (message_get_message_annotations(message, &message_annotations) != 0) + { + LogError("Failed getting TWIN message annotations"); + result = __FAILURE__; + } + else + { + result = RESULT_OK; - if (message_annotations == NULL) - { - *has_version = false; - *has_status_code = false; - } - else - { - uint32_t pair_count; - if (amqpvalue_get_map_pair_count((AMQP_VALUE)message_annotations, &pair_count) != 0) - { - LogError("Failed getting TWIN message annotations count"); - result = __FAILURE__; - } - else - { - uint32_t i; + if (message_annotations == NULL) + { + *has_version = false; + *has_status_code = false; + } + else + { + uint32_t pair_count; + if (amqpvalue_get_map_pair_count((AMQP_VALUE)message_annotations, &pair_count) != 0) + { + LogError("Failed getting TWIN message annotations count"); + result = __FAILURE__; + } + else + { + uint32_t i; - *has_status_code = false; - *has_version = false; + *has_status_code = false; + *has_version = false; - for (i = 0; i < pair_count; i++) - { - AMQP_VALUE amqp_map_key; - AMQP_VALUE amqp_map_value; + for (i = 0; i < pair_count; i++) + { + AMQP_VALUE amqp_map_key; + AMQP_VALUE amqp_map_value; - if (amqpvalue_get_map_key_value_pair((AMQP_VALUE)message_annotations, i, &amqp_map_key, &amqp_map_value) != 0) - { - LogError("Failed getting AMQP map key/value pair (%d)", i); - result = __FAILURE__; - } - else - { - const char* map_key_name; + if (amqpvalue_get_map_key_value_pair((AMQP_VALUE)message_annotations, i, &amqp_map_key, &amqp_map_value) != 0) + { + LogError("Failed getting AMQP map key/value pair (%d)", i); + result = __FAILURE__; + } + else + { + const char* map_key_name; - if (amqpvalue_get_symbol(amqp_map_key, &map_key_name) != 0) - { - LogError("Failed getting AMQP value symbol"); - result = __FAILURE__; - break; - } - else - { - if (strcmp(TWIN_MESSAGE_PROPERTY_STATUS, map_key_name) == 0) - { - if (amqpvalue_get_type(amqp_map_value) != AMQP_TYPE_INT) - { - LogError("TWIN message status property expected to be INT"); - result = __FAILURE__; - break; - } - else if (amqpvalue_get_int(amqp_map_value, status_code) != 0) - { - LogError("Failed getting TWIN message status code value"); - result = __FAILURE__; - break; - } - else - { - *has_status_code = true; - } - } - else if (strcmp(TWIN_MESSAGE_PROPERTY_VERSION, map_key_name) == 0) - { - if (amqpvalue_get_type(amqp_map_value) != AMQP_TYPE_LONG) - { - LogError("TWIN message version property expected to be LONG"); - result = __FAILURE__; - break; - } - else if (amqpvalue_get_long(amqp_map_value, version) != 0) - { - LogError("Failed getting TWIN message version value"); - result = __FAILURE__; - break; - } - else - { - *has_version = true; - } - } + if (amqpvalue_get_symbol(amqp_map_key, &map_key_name) != 0) + { + LogError("Failed getting AMQP value symbol"); + result = __FAILURE__; + break; + } + else + { + if (strcmp(TWIN_MESSAGE_PROPERTY_STATUS, map_key_name) == 0) + { + if (amqpvalue_get_type(amqp_map_value) != AMQP_TYPE_INT) + { + LogError("TWIN message status property expected to be INT"); + result = __FAILURE__; + break; + } + else if (amqpvalue_get_int(amqp_map_value, status_code) != 0) + { + LogError("Failed getting TWIN message status code value"); + result = __FAILURE__; + break; + } + else + { + *has_status_code = true; + } + } + else if (strcmp(TWIN_MESSAGE_PROPERTY_VERSION, map_key_name) == 0) + { + if (amqpvalue_get_type(amqp_map_value) != AMQP_TYPE_LONG) + { + LogError("TWIN message version property expected to be LONG"); + result = __FAILURE__; + break; + } + else if (amqpvalue_get_long(amqp_map_value, version) != 0) + { + LogError("Failed getting TWIN message version value"); + result = __FAILURE__; + break; + } + else + { + *has_version = true; + } + } - amqpvalue_destroy(amqp_map_value); - } + amqpvalue_destroy(amqp_map_value); + } - amqpvalue_destroy(amqp_map_key); - } - } - } + amqpvalue_destroy(amqp_map_key); + } + } + } - amqpvalue_destroy(message_annotations); - } + amqpvalue_destroy(message_annotations); + } - if (result == RESULT_OK) - { - MESSAGE_BODY_TYPE body_type; + if (result == RESULT_OK) + { + MESSAGE_BODY_TYPE body_type; - if (message_get_body_type(message, &body_type) != 0) - { - LogError("Failed getting TWIN message body type"); - result = __FAILURE__; - } - else if (body_type == MESSAGE_BODY_TYPE_DATA) - { - size_t body_count; + if (message_get_body_type(message, &body_type) != 0) + { + LogError("Failed getting TWIN message body type"); + result = __FAILURE__; + } + else if (body_type == MESSAGE_BODY_TYPE_DATA) + { + size_t body_count; - if (message_get_body_amqp_data_count(message, &body_count) != 0) - { - LogError("Failed getting TWIN message body count"); - result = __FAILURE__; - } - else if (body_count != 1) - { - LogError("Unexpected number of TWIN message bodies (%d)", body_count); - result = __FAILURE__; - } - else if (message_get_body_amqp_data_in_place(message, 0, twin_report) != 0) - { - LogError("Failed getting TWIN message body"); - result = __FAILURE__; - } - else - { - *has_twin_report = true; - } - } - else if (body_type != MESSAGE_BODY_TYPE_NONE) - { - LogError("Unexpected TWIN message body %d", body_type); - result = __FAILURE__; - } - else - { - *has_twin_report = false; - } - } - } + if (message_get_body_amqp_data_count(message, &body_count) != 0) + { + LogError("Failed getting TWIN message body count"); + result = __FAILURE__; + } + else if (body_count != 1) + { + LogError("Unexpected number of TWIN message bodies (%d)", body_count); + result = __FAILURE__; + } + else if (message_get_body_amqp_data_in_place(message, 0, twin_report) != 0) + { + LogError("Failed getting TWIN message body"); + result = __FAILURE__; + } + else + { + *has_twin_report = true; + } + } + else if (body_type != MESSAGE_BODY_TYPE_NONE) + { + LogError("Unexpected TWIN message body %d", body_type); + result = __FAILURE__; + } + else + { + *has_twin_report = false; + } + } + } - if (result != RESULT_OK) - { - free(*correlation_id); - *correlation_id = NULL; - } - } + if (result != RESULT_OK) + { + free(*correlation_id); + *correlation_id = NULL; + } + } - return result; + return result; } static void destroy_link_attach_properties(MAP_HANDLE properties) { - Map_Destroy(properties); + Map_Destroy(properties); } static MAP_HANDLE create_link_attach_properties(TWIN_MESSENGER_INSTANCE* twin_msgr) { - MAP_HANDLE result; + MAP_HANDLE result; - if ((result = Map_Create(NULL)) == NULL) - { - LogError("Failed creating map for AMQP link properties (%s)", twin_msgr->device_id); - } - else - { - char* correlation_id; + if ((result = Map_Create(NULL)) == NULL) + { + LogError("Failed creating map for AMQP link properties (%s)", twin_msgr->device_id); + } + else + { + char* correlation_id; - if ((correlation_id = generate_twin_correlation_id()) == NULL) - { - LogError("Failed adding AMQP link property "); - destroy_link_attach_properties(result); - result = NULL; - } - else - { - if (Map_Add(result, CLIENT_VERSION_PROPERTY_NAME, twin_msgr->client_version) != MAP_OK) - { - LogError("Failed adding AMQP link property 'client version' (%s)", twin_msgr->device_id); - destroy_link_attach_properties(result); - result = NULL; - } - else if (Map_Add(result, TWIN_CORRELATION_ID_PROPERTY_NAME, correlation_id) != MAP_OK) - { - LogError("Failed adding AMQP link property 'correlation-id' (%s)", twin_msgr->device_id); - destroy_link_attach_properties(result); - result = NULL; - } - else if (Map_Add(result, TWIN_API_VERSION_PROPERTY_NAME, TWIN_API_VERSION_NUMBER) != MAP_OK) - { - LogError("Failed adding AMQP link property 'api-version' (%s)", twin_msgr->device_id); - destroy_link_attach_properties(result); - result = NULL; - } + if ((correlation_id = generate_twin_correlation_id()) == NULL) + { + LogError("Failed adding AMQP link property "); + destroy_link_attach_properties(result); + result = NULL; + } + else + { + if (Map_Add(result, CLIENT_VERSION_PROPERTY_NAME, twin_msgr->client_version) != MAP_OK) + { + LogError("Failed adding AMQP link property 'client version' (%s)", twin_msgr->device_id); + destroy_link_attach_properties(result); + result = NULL; + } + else if (Map_Add(result, TWIN_CORRELATION_ID_PROPERTY_NAME, correlation_id) != MAP_OK) + { + LogError("Failed adding AMQP link property 'correlation-id' (%s)", twin_msgr->device_id); + destroy_link_attach_properties(result); + result = NULL; + } + else if (Map_Add(result, TWIN_API_VERSION_PROPERTY_NAME, TWIN_API_VERSION_NUMBER) != MAP_OK) + { + LogError("Failed adding AMQP link property 'api-version' (%s)", twin_msgr->device_id); + destroy_link_attach_properties(result); + result = NULL; + } - free(correlation_id); - } - } + free(correlation_id); + } + } - return result; + return result; } static const char* get_twin_operation_name(TWIN_OPERATION_TYPE op_type) { - const char* result; + const char* result; - switch (op_type) - { - default: - LogError("Unrecognized TWIN operation (%s)", ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_type)); - result = NULL; - break; - case TWIN_OPERATION_TYPE_PATCH: - result = TWIN_OPERATION_PATCH; - break; - case TWIN_OPERATION_TYPE_GET: - result = TWIN_OPERATION_GET; - break; - case TWIN_OPERATION_TYPE_PUT: - result = TWIN_OPERATION_PUT; - break; - case TWIN_OPERATION_TYPE_DELETE: - result = TWIN_OPERATION_DELETE; - break; - } + switch (op_type) + { + default: + LogError("Unrecognized TWIN operation (%s)", ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_type)); + result = NULL; + break; + case TWIN_OPERATION_TYPE_PATCH: + result = TWIN_OPERATION_PATCH; + break; + case TWIN_OPERATION_TYPE_GET: + result = TWIN_OPERATION_GET; + break; + case TWIN_OPERATION_TYPE_PUT: + result = TWIN_OPERATION_PUT; + break; + case TWIN_OPERATION_TYPE_DELETE: + result = TWIN_OPERATION_DELETE; + break; + } - return result; + return result; } static MESSAGE_HANDLE create_amqp_message_for_twin_operation(TWIN_OPERATION_TYPE op_type, char* correlation_id, CONSTBUFFER_HANDLE data) { - MESSAGE_HANDLE result; - const char* twin_op_name; + MESSAGE_HANDLE result; + const char* twin_op_name; - if ((twin_op_name = get_twin_operation_name(op_type))== NULL) - { - result = NULL; - } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_063: [An `amqp_message` (MESSAGE_HANDLE) shall be created using message_create()] - else if ((result = message_create()) == NULL) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_064: [If message_create() fails, message_create_for_twin_operation shall fail and return NULL] - LogError("Failed creating AMQP message (%s)", twin_op_name); - } - else - { - AMQP_VALUE msg_annotations_map; + if ((twin_op_name = get_twin_operation_name(op_type))== NULL) + { + result = NULL; + } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_063: [An `amqp_message` (MESSAGE_HANDLE) shall be created using message_create()] + else if ((result = message_create()) == NULL) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_064: [If message_create() fails, message_create_for_twin_operation shall fail and return NULL] + LogError("Failed creating AMQP message (%s)", twin_op_name); + } + else + { + AMQP_VALUE msg_annotations_map; - if ((msg_annotations_map = amqpvalue_create_map()) == NULL) - { - LogError("Failed creating map for message annotations"); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] - message_destroy(result); - result = NULL; - } - else - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_065: [`operation=<op_type>` (GET/PATCH/PUT/DELETE) must be added to the `amqp_message` annotations] - if (add_map_item(msg_annotations_map, TWIN_MESSAGE_PROPERTY_OPERATION, twin_op_name) != RESULT_OK) - { - LogError("Failed adding operation to AMQP message annotations (%s)", twin_op_name); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] - message_destroy(result); - result = NULL; - } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_066: [If `op_type` is PATCH, `resource=/properties/reported` must be added to the `amqp_message` annotations] - else if ((op_type == TWIN_OPERATION_TYPE_PATCH) && - add_map_item(msg_annotations_map, TWIN_MESSAGE_PROPERTY_RESOURCE, TWIN_RESOURCE_REPORTED) != RESULT_OK) - { - LogError("Failed adding resource to AMQP message annotations (%s)", twin_op_name); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] - message_destroy(result); - result = NULL; - } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_067: [If `op_type` is PUT or DELETE, `resource=/notifications/twin/properties/desired` must be added to the `amqp_message` annotations] - else if ((op_type == TWIN_OPERATION_TYPE_PUT || op_type == TWIN_OPERATION_TYPE_DELETE) && - add_map_item(msg_annotations_map, TWIN_MESSAGE_PROPERTY_RESOURCE, TWIN_RESOURCE_DESIRED) != RESULT_OK) - { - LogError("Failed adding resource to AMQP message annotations (%s)", twin_op_name); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] - message_destroy(result); - result = NULL; - } - else if (add_amqp_message_annotation(result, msg_annotations_map) != RESULT_OK) - { - LogError("Failed adding annotations to AMQP message (%s)", twin_op_name); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] - message_destroy(result); - result = NULL; - } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_068: [The `correlation-id` property of `amqp_message` shall be set with an UUID string] - else if (set_message_correlation_id(result, correlation_id) != RESULT_OK) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_069: [If setting `correlation-id` fails, message_create_for_twin_operation shall fail and return NULL] - LogError("Failed AMQP message correlation-id (%s)", twin_op_name); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] - message_destroy(result); - result = NULL; - } - else - { - BINARY_DATA binary_data; + if ((msg_annotations_map = amqpvalue_create_map()) == NULL) + { + LogError("Failed creating map for message annotations"); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] + message_destroy(result); + result = NULL; + } + else + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_065: [`operation=<op_type>` (GET/PATCH/PUT/DELETE) must be added to the `amqp_message` annotations] + if (add_map_item(msg_annotations_map, TWIN_MESSAGE_PROPERTY_OPERATION, twin_op_name) != RESULT_OK) + { + LogError("Failed adding operation to AMQP message annotations (%s)", twin_op_name); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] + message_destroy(result); + result = NULL; + } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_066: [If `op_type` is PATCH, `resource=/properties/reported` must be added to the `amqp_message` annotations] + else if ((op_type == TWIN_OPERATION_TYPE_PATCH) && + add_map_item(msg_annotations_map, TWIN_MESSAGE_PROPERTY_RESOURCE, TWIN_RESOURCE_REPORTED) != RESULT_OK) + { + LogError("Failed adding resource to AMQP message annotations (%s)", twin_op_name); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] + message_destroy(result); + result = NULL; + } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_067: [If `op_type` is PUT or DELETE, `resource=/notifications/twin/properties/desired` must be added to the `amqp_message` annotations] + else if ((op_type == TWIN_OPERATION_TYPE_PUT || op_type == TWIN_OPERATION_TYPE_DELETE) && + add_map_item(msg_annotations_map, TWIN_MESSAGE_PROPERTY_RESOURCE, TWIN_RESOURCE_DESIRED) != RESULT_OK) + { + LogError("Failed adding resource to AMQP message annotations (%s)", twin_op_name); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] + message_destroy(result); + result = NULL; + } + else if (add_amqp_message_annotation(result, msg_annotations_map) != RESULT_OK) + { + LogError("Failed adding annotations to AMQP message (%s)", twin_op_name); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] + message_destroy(result); + result = NULL; + } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_068: [The `correlation-id` property of `amqp_message` shall be set with an UUID string] + else if (set_message_correlation_id(result, correlation_id) != RESULT_OK) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_069: [If setting `correlation-id` fails, message_create_for_twin_operation shall fail and return NULL] + LogError("Failed AMQP message correlation-id (%s)", twin_op_name); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] + message_destroy(result); + result = NULL; + } + else + { + BINARY_DATA binary_data; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_070: [If `data` is not NULL, it shall be added to `amqp_message` using message_add_body_amqp_data()] - if (data != NULL) - { - const CONSTBUFFER* data_buffer; - data_buffer = CONSTBUFFER_GetContent(data); - binary_data.bytes = data_buffer->buffer; - binary_data.length = data_buffer->size; - } - else - { - binary_data.bytes = EMPTY_TWIN_BODY_DATA; - binary_data.length = EMPTY_TWIN_BODY_SIZE; - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_070: [If `data` is not NULL, it shall be added to `amqp_message` using message_add_body_amqp_data()] + if (data != NULL) + { + const CONSTBUFFER* data_buffer; + data_buffer = CONSTBUFFER_GetContent(data); + binary_data.bytes = data_buffer->buffer; + binary_data.length = data_buffer->size; + } + else + { + binary_data.bytes = EMPTY_TWIN_BODY_DATA; + binary_data.length = EMPTY_TWIN_BODY_SIZE; + } - if (message_add_body_amqp_data(result, binary_data) != 0) - { - LogError("Failed adding twin patch data to AMQP message body"); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] - message_destroy(result); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_071: [If message_add_body_amqp_data() fails, message_create_for_twin_operation shall fail and return NULL] - result = NULL; - } - } + if (message_add_body_amqp_data(result, binary_data) != 0) + { + LogError("Failed adding twin patch data to AMQP message body"); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_072: [If any errors occur, message_create_for_twin_operation shall release all memory it has allocated] + message_destroy(result); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_071: [If message_add_body_amqp_data() fails, message_create_for_twin_operation shall fail and return NULL] + result = NULL; + } + } - amqpvalue_destroy(msg_annotations_map); - } - } + amqpvalue_destroy(msg_annotations_map); + } + } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_073: [If no errors occur, message_create_for_twin_operation shall return `amqp_message`] - return result; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_073: [If no errors occur, message_create_for_twin_operation shall return `amqp_message`] + return result; } static TWIN_REPORT_STATE_RESULT get_twin_messenger_result_from(AMQP_MESSENGER_SEND_RESULT amqp_send_result) { - TWIN_REPORT_STATE_RESULT result; + TWIN_REPORT_STATE_RESULT result; - switch (amqp_send_result) - { - case AMQP_MESSENGER_SEND_RESULT_SUCCESS: - result = TWIN_REPORT_STATE_RESULT_SUCCESS; - break; - case AMQP_MESSENGER_SEND_RESULT_CANCELLED: - result = TWIN_REPORT_STATE_RESULT_CANCELLED; - break; - case AMQP_MESSENGER_SEND_RESULT_ERROR: - result = TWIN_REPORT_STATE_RESULT_ERROR; - break; - default: - LogError("Unrecognized enum value %s", ENUM_TO_STRING(AMQP_MESSENGER_SEND_RESULT, amqp_send_result)); - result = TWIN_REPORT_STATE_RESULT_ERROR; - break; - }; + switch (amqp_send_result) + { + case AMQP_MESSENGER_SEND_RESULT_SUCCESS: + result = TWIN_REPORT_STATE_RESULT_SUCCESS; + break; + case AMQP_MESSENGER_SEND_RESULT_CANCELLED: + result = TWIN_REPORT_STATE_RESULT_CANCELLED; + break; + case AMQP_MESSENGER_SEND_RESULT_ERROR: + result = TWIN_REPORT_STATE_RESULT_ERROR; + break; + default: + LogError("Unrecognized enum value %s", ENUM_TO_STRING(AMQP_MESSENGER_SEND_RESULT, amqp_send_result)); + result = TWIN_REPORT_STATE_RESULT_ERROR; + break; + }; - return result; + return result; } static TWIN_REPORT_STATE_REASON get_twin_messenger_reason_from(AMQP_MESSENGER_REASON amqp_reason) { - TWIN_REPORT_STATE_REASON result; + TWIN_REPORT_STATE_REASON result; - switch (amqp_reason) - { - case AMQP_MESSENGER_REASON_NONE: - result = TWIN_REPORT_STATE_REASON_NONE; - break; - case AMQP_MESSENGER_REASON_FAIL_SENDING: - result = TWIN_REPORT_STATE_REASON_FAIL_SENDING; - break; - case AMQP_MESSENGER_REASON_TIMEOUT: - result = TWIN_REPORT_STATE_REASON_TIMEOUT; - break; - case AMQP_MESSENGER_REASON_MESSENGER_DESTROYED: - result = TWIN_REPORT_STATE_REASON_MESSENGER_DESTROYED; - break; - case AMQP_MESSENGER_REASON_CANNOT_PARSE: - result = TWIN_REPORT_STATE_REASON_NONE; - break; - default: - LogError("Unrecognized enum value %s (%d)", ENUM_TO_STRING(AMQP_MESSENGER_REASON, amqp_reason), amqp_reason); - result = TWIN_REPORT_STATE_REASON_NONE; - break; - }; + switch (amqp_reason) + { + case AMQP_MESSENGER_REASON_NONE: + result = TWIN_REPORT_STATE_REASON_NONE; + break; + case AMQP_MESSENGER_REASON_FAIL_SENDING: + result = TWIN_REPORT_STATE_REASON_FAIL_SENDING; + break; + case AMQP_MESSENGER_REASON_TIMEOUT: + result = TWIN_REPORT_STATE_REASON_TIMEOUT; + break; + case AMQP_MESSENGER_REASON_MESSENGER_DESTROYED: + result = TWIN_REPORT_STATE_REASON_MESSENGER_DESTROYED; + break; + case AMQP_MESSENGER_REASON_CANNOT_PARSE: + result = TWIN_REPORT_STATE_REASON_NONE; + break; + default: + LogError("Unrecognized enum value %s (%d)", ENUM_TO_STRING(AMQP_MESSENGER_REASON, amqp_reason), amqp_reason); + result = TWIN_REPORT_STATE_REASON_NONE; + break; + }; - return result; + return result; } static void on_amqp_send_complete_callback(AMQP_MESSENGER_SEND_RESULT result, AMQP_MESSENGER_REASON reason, void* context) { - // Applicable to TWIN requests for reported state PATCH, GET, PUT, DELETE. + // Applicable to TWIN requests for reported state PATCH, GET, PUT, DELETE. - if (context == NULL) - { - LogError("Invalid argument (context is NULL)"); - } - else - { - TWIN_OPERATION_CONTEXT* twin_op_ctx = (TWIN_OPERATION_CONTEXT*)context; + if (context == NULL) + { + LogError("Invalid argument (context is NULL)"); + } + else + { + TWIN_OPERATION_CONTEXT* twin_op_ctx = (TWIN_OPERATION_CONTEXT*)context; - if (result != AMQP_MESSENGER_SEND_RESULT_SUCCESS) - { - if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PATCH) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_095: [If operation is a reported state PATCH, if a failure occurs `on_report_state_complete_callback` shall be invoked with TWIN_REPORT_STATE_RESULT_ERROR, status code from the AMQP response and the saved context] - if (twin_op_ctx->on_report_state_complete_callback != NULL) - { - TWIN_REPORT_STATE_RESULT callback_result; - TWIN_REPORT_STATE_REASON callback_reason; + if (result != AMQP_MESSENGER_SEND_RESULT_SUCCESS) + { + if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PATCH) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_095: [If operation is a reported state PATCH, if a failure occurs `on_report_state_complete_callback` shall be invoked with TWIN_REPORT_STATE_RESULT_ERROR, status code from the AMQP response and the saved context] + if (twin_op_ctx->on_report_state_complete_callback != NULL) + { + TWIN_REPORT_STATE_RESULT callback_result; + TWIN_REPORT_STATE_REASON callback_reason; - callback_result = get_twin_messenger_result_from(result); - callback_reason = get_twin_messenger_reason_from(reason); + callback_result = get_twin_messenger_result_from(result); + callback_reason = get_twin_messenger_reason_from(reason); - twin_op_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_NONE, 0, (void*)twin_op_ctx->on_report_state_complete_context); - } - } - else if (reason != AMQP_MESSENGER_REASON_MESSENGER_DESTROYED) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_096: [If operation is a GET/PUT/DELETE, if a failure occurs the TWIN messenger shall attempt to subscribe/unsubscribe again] - LogError("Failed sending TWIN operation request (%s, %s, %s, %s, %s)", - twin_op_ctx->msgr->device_id, - ENUM_TO_STRING(TWIN_OPERATION_TYPE, twin_op_ctx->type), - twin_op_ctx->correlation_id, - ENUM_TO_STRING(AMQP_MESSENGER_SEND_RESULT, result), ENUM_TO_STRING(AMQP_MESSENGER_REASON, reason)); + twin_op_ctx->on_report_state_complete_callback(callback_result, callback_reason, 0, (void*)twin_op_ctx->on_report_state_complete_context); + } + } + else if (reason != AMQP_MESSENGER_REASON_MESSENGER_DESTROYED) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_096: [If operation is a GET/PUT/DELETE, if a failure occurs the TWIN messenger shall attempt to subscribe/unsubscribe again] + LogError("Failed sending TWIN operation request (%s, %s, %s, %s, %s)", + twin_op_ctx->msgr->device_id, + ENUM_TO_STRING(TWIN_OPERATION_TYPE, twin_op_ctx->type), + twin_op_ctx->correlation_id, + ENUM_TO_STRING(AMQP_MESSENGER_SEND_RESULT, result), ENUM_TO_STRING(AMQP_MESSENGER_REASON, reason)); - if (twin_op_ctx->type == TWIN_OPERATION_TYPE_GET && - twin_op_ctx->msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES) - { - twin_op_ctx->msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES; - twin_op_ctx->msgr->subscription_error_count++; - } - else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PUT && - twin_op_ctx->msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES) - { - twin_op_ctx->msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES; - twin_op_ctx->msgr->subscription_error_count++; - } - } + if (twin_op_ctx->type == TWIN_OPERATION_TYPE_GET && + twin_op_ctx->msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES) + { + twin_op_ctx->msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES; + twin_op_ctx->msgr->subscription_error_count++; + } + else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PUT && + twin_op_ctx->msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES) + { + twin_op_ctx->msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES; + twin_op_ctx->msgr->subscription_error_count++; + } + } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_097: [If a failure occurred, the current operation shall be removed from `twin_msgr->operations`] - if (remove_twin_operation_context_from_queue(twin_op_ctx) != RESULT_OK) - { - update_state(twin_op_ctx->msgr, TWIN_MESSENGER_STATE_ERROR); - } - else - { - destroy_twin_operation_context(twin_op_ctx); - } - } - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_097: [If a failure occurred, the current operation shall be removed from `twin_msgr->operations`] + if (remove_twin_operation_context_from_queue(twin_op_ctx) != RESULT_OK) + { + update_state(twin_op_ctx->msgr, TWIN_MESSENGER_STATE_ERROR); + } + else + { + destroy_twin_operation_context(twin_op_ctx); + } + } + } } static int send_twin_operation_request(TWIN_MESSENGER_INSTANCE* twin_msgr, TWIN_OPERATION_CONTEXT* op_ctx, CONSTBUFFER_HANDLE data) { - int result; - MESSAGE_HANDLE amqp_message; + int result; + MESSAGE_HANDLE amqp_message; - if ((amqp_message = create_amqp_message_for_twin_operation(op_ctx->type, op_ctx->correlation_id, data)) == NULL) - { - LogError("Failed creating request message (%s, %s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_ctx->type), op_ctx->correlation_id); - result = __FAILURE__; - } - else - { - if ((op_ctx->time_sent = get_time(NULL)) == INDEFINITE_TIME) - { - LogError("Failed setting TWIN operation sent time (%s, %s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_ctx->type), op_ctx->correlation_id); - result = __FAILURE__; - } - else if (amqp_messenger_send_async(twin_msgr->amqp_msgr, amqp_message, on_amqp_send_complete_callback, (void*)op_ctx) != 0) - { - LogError("Failed sending request message for (%s, %s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_ctx->type), op_ctx->correlation_id); - result = __FAILURE__; - } - else - { - result = RESULT_OK; - } + if ((amqp_message = create_amqp_message_for_twin_operation(op_ctx->type, op_ctx->correlation_id, data)) == NULL) + { + LogError("Failed creating request message (%s, %s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_ctx->type), op_ctx->correlation_id); + result = __FAILURE__; + } + else + { + if ((op_ctx->time_sent = get_time(NULL)) == INDEFINITE_TIME) + { + LogError("Failed setting TWIN operation sent time (%s, %s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_ctx->type), op_ctx->correlation_id); + result = __FAILURE__; + } + else if (amqp_messenger_send_async(twin_msgr->amqp_msgr, amqp_message, on_amqp_send_complete_callback, (void*)op_ctx) != 0) + { + LogError("Failed sending request message for (%s, %s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_ctx->type), op_ctx->correlation_id); + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } - message_destroy(amqp_message); - } + message_destroy(amqp_message); + } - return result; + return result; } @@ -977,375 +977,371 @@ static bool remove_expired_twin_patch_request(const void* item, const void* match_context, bool* continue_processing) { - bool remove_item; + bool remove_item; - if (item == NULL || match_context == NULL || continue_processing == NULL) - { - LogError("Invalid argument (item=%p, match_context=%p, continue_processing=%p)", item, match_context, continue_processing); - *continue_processing = false; - remove_item = false; - } - else - { + if (item == NULL || match_context == NULL || continue_processing == NULL) + { + LogError("Invalid argument (item=%p, match_context=%p, continue_processing=%p)", item, match_context, continue_processing); + remove_item = false; + } + else + { - time_t current_time = *(time_t*)match_context; - TWIN_PATCH_OPERATION_CONTEXT* twin_patch_ctx = (TWIN_PATCH_OPERATION_CONTEXT*)item; + time_t current_time = *(time_t*)match_context; + TWIN_PATCH_OPERATION_CONTEXT* twin_patch_ctx = (TWIN_PATCH_OPERATION_CONTEXT*)item; - if (get_difftime(current_time, twin_patch_ctx->time_enqueued) >= DEFAULT_TWIN_OPERATION_TIMEOUT_SECS) - { - remove_item = true; - *continue_processing = true; + if (get_difftime(current_time, twin_patch_ctx->time_enqueued) >= DEFAULT_TWIN_OPERATION_TIMEOUT_SECS) + { + remove_item = true; + *continue_processing = true; - if (twin_patch_ctx->on_report_state_complete_callback != NULL) - { - twin_patch_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_TIMEOUT, 0, twin_patch_ctx->on_report_state_complete_context); - } + if (twin_patch_ctx->on_report_state_complete_callback != NULL) + { + twin_patch_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_TIMEOUT, 0, twin_patch_ctx->on_report_state_complete_context); + } - CONSTBUFFER_Destroy(twin_patch_ctx->data); - free(twin_patch_ctx); - } - else - { - remove_item = false; - *continue_processing = false; - } - } + CONSTBUFFER_Destroy(twin_patch_ctx->data); + free(twin_patch_ctx); + } + else + { + remove_item = false; + *continue_processing = false; + } + } - return remove_item; + return remove_item; } static bool remove_expired_twin_operation_request(const void* item, const void* match_context, bool* continue_processing) { - bool result; + bool result; - if (item == NULL || match_context == NULL || continue_processing == NULL) - { - LogError("Invalid argument (item=%p, match_context=%p, continue_processing=%p)", item, match_context, continue_processing); - *continue_processing = false; - result = false; - } - else - { - TWIN_OPERATION_CONTEXT* twin_op_ctx = (TWIN_OPERATION_CONTEXT*)item; - TWIN_MESSENGER_INSTANCE* twin_msgr = twin_op_ctx->msgr; - time_t current_time = *(time_t*)match_context; + if (item == NULL || match_context == NULL || continue_processing == NULL) + { + LogError("Invalid argument (item=%p, match_context=%p, continue_processing=%p)", item, match_context, continue_processing); + result = false; + } + else + { + TWIN_OPERATION_CONTEXT* twin_op_ctx = (TWIN_OPERATION_CONTEXT*)item; + TWIN_MESSENGER_INSTANCE* twin_msgr = twin_op_ctx->msgr; + time_t current_time = *(time_t*)match_context; - if (get_difftime(current_time, twin_op_ctx->time_sent) < DEFAULT_TWIN_OPERATION_TIMEOUT_SECS) - { - result = false; - // All next elements in the list have a later time_sent, so they won't be expired, and don't need to be removed. - *continue_processing = false; - } - else - { - LogError("Twin operation timed out (%s, %s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, twin_op_ctx->type), twin_op_ctx->correlation_id); - result = true; - *continue_processing = true; + if (get_difftime(current_time, twin_op_ctx->time_sent) < DEFAULT_TWIN_OPERATION_TIMEOUT_SECS) + { + result = false; + // All next elements in the list have a later time_sent, so they won't be expired, and don't need to be removed. + *continue_processing = false; + } + else + { + LogError("Twin operation timed out (%s, %s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, twin_op_ctx->type), twin_op_ctx->correlation_id); + result = true; + *continue_processing = true; - if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PATCH) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_081: [If a timed-out item is a reported property PATCH, `on_report_state_complete_callback` shall be invoked with RESULT_ERROR and REASON_TIMEOUT] - if (twin_op_ctx->on_report_state_complete_callback != NULL) - { - twin_op_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_TIMEOUT, 0, twin_op_ctx->on_report_state_complete_context); - } - } - else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_GET) - { - if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES) - { - twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES; - twin_msgr->subscription_error_count++; - } - } - else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PUT) - { - if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_SUBSCRIBING) - { - twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES; - twin_msgr->subscription_error_count++; - } - } - else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_DELETE) - { - if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBING) - { - twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE; - twin_msgr->subscription_error_count++; - } - } + if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PATCH) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_081: [If a timed-out item is a reported property PATCH, `on_report_state_complete_callback` shall be invoked with RESULT_ERROR and REASON_TIMEOUT] + if (twin_op_ctx->on_report_state_complete_callback != NULL) + { + twin_op_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_TIMEOUT, 0, twin_op_ctx->on_report_state_complete_context); + } + } + else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_GET) + { + if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES) + { + twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES; + twin_msgr->subscription_error_count++; + } + } + else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PUT) + { + if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_SUBSCRIBING) + { + twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES; + twin_msgr->subscription_error_count++; + } + } + else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_DELETE) + { + if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBING) + { + twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE; + twin_msgr->subscription_error_count++; + } + } - destroy_twin_operation_context(twin_op_ctx); - } - } + destroy_twin_operation_context(twin_op_ctx); + } + } - return result; + return result; } static void process_timeouts(TWIN_MESSENGER_INSTANCE* twin_msgr) { - time_t current_time; + time_t current_time; - if ((current_time = get_time(NULL)) == INDEFINITE_TIME) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_082: [If any failure occurs while verifying/removing timed-out items `twin_msgr->state` shall be set to TWIN_MESSENGER_STATE_ERROR and user informed] - LogError("Failed obtaining current time (%s)", twin_msgr->device_id); - update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); - } - else - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_080: [twin_messenger_do_work() shall remove and destroy any timed out items from `twin_msgr->pending_patches` and `twin_msgr->operations`] - (void)singlylinkedlist_remove_if(twin_msgr->pending_patches, remove_expired_twin_patch_request, (const void*)¤t_time); - (void)singlylinkedlist_remove_if(twin_msgr->operations, remove_expired_twin_operation_request, (const void*)¤t_time); - } + if ((current_time = get_time(NULL)) == INDEFINITE_TIME) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_082: [If any failure occurs while verifying/removing timed-out items `twin_msgr->state` shall be set to TWIN_MESSENGER_STATE_ERROR and user informed] + LogError("Failed obtaining current time (%s)", twin_msgr->device_id); + update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); + } + else + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_080: [twin_messenger_do_work() shall remove and destroy any timed out items from `twin_msgr->pending_patches` and `twin_msgr->operations`] + (void)singlylinkedlist_remove_if(twin_msgr->pending_patches, remove_expired_twin_patch_request, (const void*)¤t_time); + (void)singlylinkedlist_remove_if(twin_msgr->operations, remove_expired_twin_operation_request, (const void*)¤t_time); + } } static bool send_pending_twin_patch(const void* item, const void* match_context, bool* continue_processing) { - bool result; + bool result; - if (item == NULL || match_context == NULL || continue_processing == NULL) - { - LogError("Invalid argument (item=%p, match_context=%p, continue_processing=%p)", item, match_context, continue_processing); - *continue_processing = false; - result = false; - } - else - { - TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)match_context; - TWIN_PATCH_OPERATION_CONTEXT* twin_patch_ctx = (TWIN_PATCH_OPERATION_CONTEXT*)item; - TWIN_OPERATION_CONTEXT* twin_op_ctx; + if (item == NULL || match_context == NULL || continue_processing == NULL) + { + LogError("Invalid argument (item=%p, match_context=%p, continue_processing=%p)", item, match_context, continue_processing); + result = false; + } + else + { + TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)match_context; + TWIN_PATCH_OPERATION_CONTEXT* twin_patch_ctx = (TWIN_PATCH_OPERATION_CONTEXT*)item; + TWIN_OPERATION_CONTEXT* twin_op_ctx; - if ((twin_op_ctx = create_twin_operation_context(twin_msgr, TWIN_OPERATION_TYPE_PATCH)) == NULL) - { - LogError("Failed creating context for sending reported state (%s)", twin_msgr->device_id); + if ((twin_op_ctx = create_twin_operation_context(twin_msgr, TWIN_OPERATION_TYPE_PATCH)) == NULL) + { + LogError("Failed creating context for sending reported state (%s)", twin_msgr->device_id); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_061: [If any other failure occurs sending the PATCH request, `on_report_state_complete_callback` shall be invoked with RESULT_ERROR and REASON_INTERNAL_ERROR] - if (twin_patch_ctx->on_report_state_complete_callback != NULL) - { - twin_patch_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_INTERNAL_ERROR, 0, twin_patch_ctx->on_report_state_complete_context); - } - } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_062: [If amqp_send_async() succeeds, the PATCH request shall be queued into `twin_msgr->operations`] - else if (add_twin_operation_context_to_queue(twin_op_ctx) != RESULT_OK) - { - LogError("Failed adding TWIN operation context to queue (%s)", twin_msgr->device_id); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_061: [If any other failure occurs sending the PATCH request, `on_report_state_complete_callback` shall be invoked with RESULT_ERROR and REASON_INTERNAL_ERROR] + if (twin_patch_ctx->on_report_state_complete_callback != NULL) + { + twin_patch_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_INTERNAL_ERROR, 0, twin_patch_ctx->on_report_state_complete_context); + } + } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_062: [If amqp_send_async() succeeds, the PATCH request shall be queued into `twin_msgr->operations`] + else if (add_twin_operation_context_to_queue(twin_op_ctx) != RESULT_OK) + { + LogError("Failed adding TWIN operation context to queue (%s)", twin_msgr->device_id); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_061: [If any other failure occurs sending the PATCH request, `on_report_state_complete_callback` shall be invoked with RESULT_ERROR and REASON_INTERNAL_ERROR] - if (twin_patch_ctx->on_report_state_complete_callback != NULL) - { - twin_patch_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_INTERNAL_ERROR, 0, twin_patch_ctx->on_report_state_complete_context); - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_061: [If any other failure occurs sending the PATCH request, `on_report_state_complete_callback` shall be invoked with RESULT_ERROR and REASON_INTERNAL_ERROR] + if (twin_patch_ctx->on_report_state_complete_callback != NULL) + { + twin_patch_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_INTERNAL_ERROR, 0, twin_patch_ctx->on_report_state_complete_context); + } - destroy_twin_operation_context(twin_op_ctx); - } - else - { - twin_op_ctx->on_report_state_complete_callback = twin_patch_ctx->on_report_state_complete_callback; - twin_op_ctx->on_report_state_complete_context = twin_patch_ctx->on_report_state_complete_context; + destroy_twin_operation_context(twin_op_ctx); + } + else + { + twin_op_ctx->on_report_state_complete_callback = twin_patch_ctx->on_report_state_complete_callback; + twin_op_ctx->on_report_state_complete_context = twin_patch_ctx->on_report_state_complete_context; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_059: [If reported property PATCH shall be sent as an uAMQP MESSAGE_HANDLE instance using amqp_send_async() passing `on_amqp_send_complete_callback`] - if (send_twin_operation_request(twin_msgr, twin_op_ctx, twin_patch_ctx->data) != RESULT_OK) - { - LogError("Failed sending reported state (%s)", twin_msgr->device_id); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_059: [If reported property PATCH shall be sent as an uAMQP MESSAGE_HANDLE instance using amqp_send_async() passing `on_amqp_send_complete_callback`] + if (send_twin_operation_request(twin_msgr, twin_op_ctx, twin_patch_ctx->data) != RESULT_OK) + { + LogError("Failed sending reported state (%s)", twin_msgr->device_id); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_060: [If amqp_send_async() fails, `on_report_state_complete_callback` shall be invoked with RESULT_ERROR and REASON_FAIL_SENDING] - if (twin_patch_ctx->on_report_state_complete_callback != NULL) - { - twin_patch_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_FAIL_SENDING, 0, twin_patch_ctx->on_report_state_complete_context); - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_060: [If amqp_send_async() fails, `on_report_state_complete_callback` shall be invoked with RESULT_ERROR and REASON_FAIL_SENDING] + if (twin_patch_ctx->on_report_state_complete_callback != NULL) + { + twin_patch_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_FAIL_SENDING, 0, twin_patch_ctx->on_report_state_complete_context); + } - (void)remove_twin_operation_context_from_queue(twin_op_ctx); - destroy_twin_operation_context(twin_op_ctx); - } - } + (void)remove_twin_operation_context_from_queue(twin_op_ctx); + destroy_twin_operation_context(twin_op_ctx); + } + } - CONSTBUFFER_Destroy(twin_patch_ctx->data); - free(twin_patch_ctx); + CONSTBUFFER_Destroy(twin_patch_ctx->data); + free(twin_patch_ctx); - *continue_processing = true; - result = true; - } + *continue_processing = true; + result = true; + } - return result; + return result; } static void process_twin_subscription(TWIN_MESSENGER_INSTANCE* twin_msgr) { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_078: [If failures occur sending subscription requests to the service for more than 3 times, TWIN messenger shall set its state to TWIN_MESSENGER_STATE_ERROR and inform the user] - if (twin_msgr->subscription_error_count >= DEFAULT_MAX_TWIN_SUBSCRIPTION_ERROR_COUNT) - { - LogError("Maximum number of TWIN subscription-related failures reached (%s, %d)", twin_msgr->device_id, twin_msgr->subscription_error_count); - update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); - } - else - { - TWIN_OPERATION_TYPE op_type = TWIN_OPERATION_TYPE_PATCH; - TWIN_SUBSCRIPTION_STATE next_subscription_state = twin_msgr->subscription_state; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_078: [If failures occur sending subscription requests to the service for more than 3 times, TWIN messenger shall set its state to TWIN_MESSENGER_STATE_ERROR and inform the user] + if (twin_msgr->subscription_error_count >= DEFAULT_MAX_TWIN_SUBSCRIPTION_ERROR_COUNT) + { + LogError("Maximum number of TWIN subscription-related failures reached (%s, %d)", twin_msgr->device_id, twin_msgr->subscription_error_count); + update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); + } + else + { + TWIN_OPERATION_TYPE op_type = TWIN_OPERATION_TYPE_PATCH; + TWIN_SUBSCRIPTION_STATE next_subscription_state = twin_msgr->subscription_state; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_074: [If subscribing, twin_messenger_do_work() shall request a complete desired properties report] - if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES) - { - op_type = TWIN_OPERATION_TYPE_GET; - next_subscription_state = TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES; - } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_075: [If a complete desired report has been received, the TWIN messenger shall request partial updates to desired properties] - else if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES) - { - op_type = TWIN_OPERATION_TYPE_PUT; - next_subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBING; - } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_076: [If unsubscribing, twin_messenger_do_work() shall send a DELETE request to the service] - else if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE) - { - op_type = TWIN_OPERATION_TYPE_PUT; - next_subscription_state = TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBING; - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_074: [If subscribing, twin_messenger_do_work() shall request a complete desired properties report] + if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES) + { + op_type = TWIN_OPERATION_TYPE_GET; + next_subscription_state = TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES; + } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_075: [If a complete desired report has been received, the TWIN messenger shall request partial updates to desired properties] + else if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES) + { + op_type = TWIN_OPERATION_TYPE_PUT; + next_subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBING; + } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_076: [If unsubscribing, twin_messenger_do_work() shall send a DELETE request to the service] + else if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE) + { + op_type = TWIN_OPERATION_TYPE_PUT; + next_subscription_state = TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBING; + } - if (next_subscription_state != twin_msgr->subscription_state) - { - TWIN_OPERATION_CONTEXT* twin_op_ctx; + if (next_subscription_state != twin_msgr->subscription_state) + { + TWIN_OPERATION_CONTEXT* twin_op_ctx; - if ((twin_op_ctx = create_twin_operation_context(twin_msgr, op_type)) == NULL) - { - LogError("Failed creating a context for TWIN request (%s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_type)); - update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); - } - else - { - if (add_twin_operation_context_to_queue(twin_op_ctx) != RESULT_OK) - { - LogError("Failed queueing TWIN request context (%s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_type)); - destroy_twin_operation_context(twin_op_ctx); - update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); - } - else if (send_twin_operation_request(twin_msgr, twin_op_ctx, NULL) != RESULT_OK) - { - LogError("Failed sending TWIN request (%s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_type)); + if ((twin_op_ctx = create_twin_operation_context(twin_msgr, op_type)) == NULL) + { + LogError("Failed creating a context for TWIN request (%s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_type)); + update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); + } + else + { + if (add_twin_operation_context_to_queue(twin_op_ctx) != RESULT_OK) + { + LogError("Failed queueing TWIN request context (%s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_type)); + destroy_twin_operation_context(twin_op_ctx); + update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); + } + else if (send_twin_operation_request(twin_msgr, twin_op_ctx, NULL) != RESULT_OK) + { + LogError("Failed sending TWIN request (%s, %s)", twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, op_type)); - (void)remove_twin_operation_context_from_queue(twin_op_ctx); - destroy_twin_operation_context(twin_op_ctx); - update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); - } - else - { - twin_msgr->subscription_state = next_subscription_state; - } - } - } - } + (void)remove_twin_operation_context_from_queue(twin_op_ctx); + destroy_twin_operation_context(twin_op_ctx); + update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); + } + else + { + twin_msgr->subscription_state = next_subscription_state; + } + } + } + } } static bool cancel_all_pending_twin_operations(const void* item, const void* match_context, bool* continue_processing) { - bool result; + bool result; - if (item == NULL || continue_processing == NULL) - { - LogError("Invalid argument (item=%p, continue_processing=%p)", item, continue_processing); - *continue_processing = false; - result = false; - } - else - { - TWIN_OPERATION_CONTEXT* twin_op_ctx = (TWIN_OPERATION_CONTEXT*)item; - (void)match_context; + if (item == NULL || continue_processing == NULL) + { + LogError("Invalid argument (item=%p, continue_processing=%p)", item, continue_processing); + result = false; + } + else + { + TWIN_OPERATION_CONTEXT* twin_op_ctx = (TWIN_OPERATION_CONTEXT*)item; + (void)match_context; - if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PATCH) - { - if (twin_op_ctx->on_report_state_complete_callback != NULL) - { - twin_op_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_CANCELLED, TWIN_REPORT_STATE_REASON_MESSENGER_DESTROYED, 0, twin_op_ctx->on_report_state_complete_context); - } - } + if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PATCH) + { + if (twin_op_ctx->on_report_state_complete_callback != NULL) + { + twin_op_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_CANCELLED, TWIN_REPORT_STATE_REASON_MESSENGER_DESTROYED, 0, twin_op_ctx->on_report_state_complete_context); + } + } - destroy_twin_operation_context(twin_op_ctx); + destroy_twin_operation_context(twin_op_ctx); - *continue_processing = true; - result = true; - } + *continue_processing = true; + result = true; + } - return result; + return result; } static bool cancel_pending_twin_patch_operation(const void* item, const void* match_context, bool* continue_processing) { - bool result; + bool result; - if (item == NULL) - { - LogError("Invalid argument (item is NULL)"); - *continue_processing = false; - result = false; - } - else - { - TWIN_PATCH_OPERATION_CONTEXT* twin_patch_ctx = (TWIN_PATCH_OPERATION_CONTEXT*)item; - (void)match_context; + if (item == NULL) + { + LogError("Invalid argument (item is NULL)"); + *continue_processing = false; + result = false; + } + else + { + TWIN_PATCH_OPERATION_CONTEXT* twin_patch_ctx = (TWIN_PATCH_OPERATION_CONTEXT*)item; + (void)match_context; - if (twin_patch_ctx->on_report_state_complete_callback != NULL) - { - twin_patch_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_CANCELLED, TWIN_REPORT_STATE_REASON_MESSENGER_DESTROYED, 0, twin_patch_ctx->on_report_state_complete_context); - } + if (twin_patch_ctx->on_report_state_complete_callback != NULL) + { + twin_patch_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_CANCELLED, TWIN_REPORT_STATE_REASON_MESSENGER_DESTROYED, 0, twin_patch_ctx->on_report_state_complete_context); + } - CONSTBUFFER_Destroy(twin_patch_ctx->data); - free(twin_patch_ctx); + CONSTBUFFER_Destroy(twin_patch_ctx->data); + free(twin_patch_ctx); - *continue_processing = true; - result = true; - } + *continue_processing = true; + result = true; + } - return result; + return result; } static void internal_twin_messenger_destroy(TWIN_MESSENGER_INSTANCE* twin_msgr) { - if (twin_msgr->amqp_msgr != NULL) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_099: [`twin_msgr->amqp_messenger` shall be destroyed using amqp_messenger_destroy()] - amqp_messenger_destroy(twin_msgr->amqp_msgr); - } + if (twin_msgr->amqp_msgr != NULL) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_099: [`twin_msgr->amqp_messenger` shall be destroyed using amqp_messenger_destroy()] + amqp_messenger_destroy(twin_msgr->amqp_msgr); + } - if (twin_msgr->pending_patches != NULL) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_100: [All elements of `twin_msgr->pending_patches` shall be removed, invoking `on_report_state_complete_callback` for each with TWIN_REPORT_STATE_REASON_MESSENGER_DESTROYED] - if (singlylinkedlist_remove_if(twin_msgr->pending_patches, cancel_pending_twin_patch_operation, twin_msgr) != 0) - { - LogError("Failed removing pending desired properties PATCH operation (%s)", twin_msgr->device_id); - } + if (twin_msgr->pending_patches != NULL) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_100: [All elements of `twin_msgr->pending_patches` shall be removed, invoking `on_report_state_complete_callback` for each with TWIN_REPORT_STATE_REASON_MESSENGER_DESTROYED] + if (singlylinkedlist_remove_if(twin_msgr->pending_patches, cancel_pending_twin_patch_operation, twin_msgr) != 0) + { + LogError("Failed removing pending desired properties PATCH operation (%s)", twin_msgr->device_id); + } - singlylinkedlist_destroy(twin_msgr->pending_patches); - } + singlylinkedlist_destroy(twin_msgr->pending_patches); + } - if (twin_msgr->operations != NULL) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_101: [All elements of `twin_msgr->operations` shall be removed, invoking `on_report_state_complete_callback` for each PATCH with TWIN_REPORT_STATE_REASON_MESSENGER_DESTROYED] - singlylinkedlist_remove_if(twin_msgr->operations, cancel_all_pending_twin_operations, (const void*)twin_msgr); - singlylinkedlist_destroy(twin_msgr->operations); - } + if (twin_msgr->operations != NULL) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_101: [All elements of `twin_msgr->operations` shall be removed, invoking `on_report_state_complete_callback` for each PATCH with TWIN_REPORT_STATE_REASON_MESSENGER_DESTROYED] + singlylinkedlist_remove_if(twin_msgr->operations, cancel_all_pending_twin_operations, (const void*)twin_msgr); + singlylinkedlist_destroy(twin_msgr->operations); + } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_102: [twin_messenger_destroy() shall release all memory allocated for and within `twin_msgr`] - if (twin_msgr->client_version != NULL) - { - free(twin_msgr->client_version); - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_102: [twin_messenger_destroy() shall release all memory allocated for and within `twin_msgr`] + if (twin_msgr->client_version != NULL) + { + free(twin_msgr->client_version); + } - if (twin_msgr->device_id != NULL) - { - free(twin_msgr->device_id); - } - - if (twin_msgr->module_id != NULL) - { - free(twin_msgr->module_id); - } + if (twin_msgr->device_id != NULL) + { + free(twin_msgr->device_id); + } - if (twin_msgr->iothub_host_fqdn != NULL) - { - free(twin_msgr->iothub_host_fqdn); - } + if (twin_msgr->module_id != NULL) + { + free(twin_msgr->module_id); + } - free(twin_msgr); + if (twin_msgr->iothub_host_fqdn != NULL) + { + free(twin_msgr->iothub_host_fqdn); + } + + free(twin_msgr); } @@ -1354,302 +1350,302 @@ static AMQP_MESSENGER_DISPOSITION_RESULT on_amqp_message_received_callback(MESSAGE_HANDLE message, AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info, void* context) { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_084: [If `message` or `context` are NULL, on_amqp_message_received_callback shall return immediately] - AMQP_MESSENGER_DISPOSITION_RESULT disposition_result; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_084: [If `message` or `context` are NULL, on_amqp_message_received_callback shall return immediately] + AMQP_MESSENGER_DISPOSITION_RESULT disposition_result; - if (message == NULL || context == NULL) - { - LogError("Invalid argument (message=%p, context=%p)", message, context); - disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED; - } - else - { - TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)context; + if (message == NULL || context == NULL) + { + LogError("Invalid argument (message=%p, context=%p)", message, context); + disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED; + } + else + { + TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)context; - char* correlation_id; + char* correlation_id; + + bool has_status_code; + int status_code; + + bool has_version; + int64_t version; - bool has_status_code; - int status_code; - - bool has_version; - int64_t version; - - bool has_twin_report; - BINARY_DATA twin_report; + bool has_twin_report; + BINARY_DATA twin_report; - amqp_messenger_destroy_disposition_info(disposition_info); - disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_ACCEPTED; + amqp_messenger_destroy_disposition_info(disposition_info); + disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_ACCEPTED; - if (parse_incoming_twin_message(message, &correlation_id, &has_version, &version, &has_status_code, &status_code, &has_twin_report, &twin_report) != 0) - { - LogError("Failed parsing incoming TWIN message (%s)", twin_msgr->device_id); - } - else - { - if (correlation_id != NULL) - { - // It is supposed to be a request sent previously (reported properties PATCH, GET, PUT or DELETE). + if (parse_incoming_twin_message(message, &correlation_id, &has_version, &version, &has_status_code, &status_code, &has_twin_report, &twin_report) != 0) + { + LogError("Failed parsing incoming TWIN message (%s)", twin_msgr->device_id); + } + else + { + if (correlation_id != NULL) + { + // It is supposed to be a request sent previously (reported properties PATCH, GET, PUT or DELETE). - LIST_ITEM_HANDLE list_item; - if ((list_item = singlylinkedlist_find(twin_msgr->operations, find_twin_operation_by_correlation_id, (const void*)correlation_id)) == NULL) - { - LogError("Could not find context of TWIN incoming message (%s, %s)", twin_msgr->device_id, correlation_id); - } - else - { - TWIN_OPERATION_CONTEXT* twin_op_ctx; + LIST_ITEM_HANDLE list_item; + if ((list_item = singlylinkedlist_find(twin_msgr->operations, find_twin_operation_by_correlation_id, (const void*)correlation_id)) == NULL) + { + LogError("Could not find context of TWIN incoming message (%s, %s)", twin_msgr->device_id, correlation_id); + } + else + { + TWIN_OPERATION_CONTEXT* twin_op_ctx; - if ((twin_op_ctx = (TWIN_OPERATION_CONTEXT*)singlylinkedlist_item_get_value(list_item)) == NULL) - { - LogError("Could not get context for incoming TWIN message (%s, %s)", twin_msgr->device_id, correlation_id); - } - else - { - if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PATCH) - { - if (!has_status_code) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_086: [If `message` is a failed response for a PATCH request, the `on_report_state_complete_callback` shall be invoked if provided passing RESULT_ERROR and the status_code zero] - LogError("Received an incoming TWIN message for a PATCH operation, but with no status code (%s, %s)", twin_msgr->device_id, correlation_id); + if ((twin_op_ctx = (TWIN_OPERATION_CONTEXT*)singlylinkedlist_item_get_value(list_item)) == NULL) + { + LogError("Could not get context for incoming TWIN message (%s, %s)", twin_msgr->device_id, correlation_id); + } + else + { + if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PATCH) + { + if (!has_status_code) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_086: [If `message` is a failed response for a PATCH request, the `on_report_state_complete_callback` shall be invoked if provided passing RESULT_ERROR and the status_code zero] + LogError("Received an incoming TWIN message for a PATCH operation, but with no status code (%s, %s)", twin_msgr->device_id, correlation_id); + + disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED; - disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED; - - if (twin_op_ctx->on_report_state_complete_callback != NULL) - { - twin_op_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_INVALID_RESPONSE, 0, twin_op_ctx->on_report_state_complete_context); - } - } - else - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_085: [If `message` is a success response for a PATCH request, the `on_report_state_complete_callback` shall be invoked if provided passing RESULT_SUCCESS and the status_code received] - if (twin_op_ctx->on_report_state_complete_callback != NULL) - { - twin_op_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_SUCCESS, TWIN_REPORT_STATE_REASON_NONE, status_code, twin_op_ctx->on_report_state_complete_context); - } - } - } - else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_GET) - { - if (!has_twin_report) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_089: [If `message` is a failed response for a GET request, the TWIN messenger shall attempt to send another GET request] - LogError("Received an incoming TWIN message for a GET operation, but with no report (%s, %s)", twin_msgr->device_id, correlation_id); + if (twin_op_ctx->on_report_state_complete_callback != NULL) + { + twin_op_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_INVALID_RESPONSE, 0, twin_op_ctx->on_report_state_complete_context); + } + } + else + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_085: [If `message` is a success response for a PATCH request, the `on_report_state_complete_callback` shall be invoked if provided passing RESULT_SUCCESS and the status_code received] + if (twin_op_ctx->on_report_state_complete_callback != NULL) + { + twin_op_ctx->on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT_SUCCESS, TWIN_REPORT_STATE_REASON_NONE, status_code, twin_op_ctx->on_report_state_complete_context); + } + } + } + else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_GET) + { + if (!has_twin_report) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_089: [If `message` is a failed response for a GET request, the TWIN messenger shall attempt to send another GET request] + LogError("Received an incoming TWIN message for a GET operation, but with no report (%s, %s)", twin_msgr->device_id, correlation_id); - disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED; + disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED; - if (twin_op_ctx->msgr->on_message_received_callback != NULL) - { - twin_op_ctx->msgr->on_message_received_callback(TWIN_UPDATE_TYPE_COMPLETE, NULL, 0, twin_op_ctx->msgr->on_message_received_context); - } + if (twin_op_ctx->msgr->on_message_received_callback != NULL) + { + twin_op_ctx->msgr->on_message_received_callback(TWIN_UPDATE_TYPE_COMPLETE, NULL, 0, twin_op_ctx->msgr->on_message_received_context); + } - if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES) - { - twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES; - twin_msgr->subscription_error_count++; - } - } - else - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_087: [If `message` is a success response for a GET request, `on_message_received_callback` shall be invoked with TWIN_UPDATE_TYPE_COMPLETE and the message body received] - if (twin_op_ctx->msgr->on_message_received_callback != NULL) - { - twin_op_ctx->msgr->on_message_received_callback(TWIN_UPDATE_TYPE_COMPLETE, (const char*)twin_report.bytes, twin_report.length, twin_op_ctx->msgr->on_message_received_context); - } + if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES) + { + twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES; + twin_msgr->subscription_error_count++; + } + } + else + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_087: [If `message` is a success response for a GET request, `on_message_received_callback` shall be invoked with TWIN_UPDATE_TYPE_COMPLETE and the message body received] + if (twin_op_ctx->msgr->on_message_received_callback != NULL) + { + twin_op_ctx->msgr->on_message_received_callback(TWIN_UPDATE_TYPE_COMPLETE, (const char*)twin_report.bytes, twin_report.length, twin_op_ctx->msgr->on_message_received_context); + } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_088: [If `message` is a success response for a GET request, the TWIN messenger shall trigger the subscription for partial updates] - if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES) - { - twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES; - twin_msgr->subscription_error_count = 0; - } - } - } - else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PUT) - { - if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_SUBSCRIBED) - { - bool subscription_succeeded = true; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_088: [If `message` is a success response for a GET request, the TWIN messenger shall trigger the subscription for partial updates] + if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES) + { + twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES; + twin_msgr->subscription_error_count = 0; + } + } + } + else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PUT) + { + if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_SUBSCRIBED) + { + bool subscription_succeeded = true; + + if (!has_status_code) + { + LogError("Received an incoming TWIN message for a PUT operation, but with no status code (%s, %s)", twin_msgr->device_id, correlation_id); + + subscription_succeeded = false; + } + else if (status_code < 200 || status_code >= 300) + { + LogError("Received status code %d for TWIN subscription request (%s, %s)", status_code, twin_msgr->device_id, correlation_id); - if (!has_status_code) - { - LogError("Received an incoming TWIN message for a PUT operation, but with no status code (%s, %s)", twin_msgr->device_id, correlation_id); - - subscription_succeeded = false; - } - else if (status_code < 200 || status_code >= 300) - { - LogError("Received status code %d for TWIN subscription request (%s, %s)", status_code, twin_msgr->device_id, correlation_id); - - subscription_succeeded = false; - } + subscription_succeeded = false; + } - if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_SUBSCRIBING) - { - if (subscription_succeeded) - { - twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBED; - twin_msgr->subscription_error_count = 0; - } - else - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_090: [If `message` is a failed response for a PUT request, the TWIN messenger shall attempt to send another PUT request] - twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES; - twin_msgr->subscription_error_count++; - } - } - } - } - else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_DELETE) - { - if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED) - { - bool unsubscription_succeeded = true; + if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_SUBSCRIBING) + { + if (subscription_succeeded) + { + twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBED; + twin_msgr->subscription_error_count = 0; + } + else + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_090: [If `message` is a failed response for a PUT request, the TWIN messenger shall attempt to send another PUT request] + twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES; + twin_msgr->subscription_error_count++; + } + } + } + } + else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_DELETE) + { + if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED) + { + bool unsubscription_succeeded = true; + + if (!has_status_code) + { + LogError("Received an incoming TWIN message for a DELETE operation, but with no status code (%s, %s)", twin_msgr->device_id, correlation_id); - if (!has_status_code) - { - LogError("Received an incoming TWIN message for a DELETE operation, but with no status code (%s, %s)", twin_msgr->device_id, correlation_id); - - unsubscription_succeeded = false; - } - else if (status_code < 200 || status_code >= 300) - { - LogError("Received status code %d for TWIN unsubscription request (%s, %s)", status_code, twin_msgr->device_id, correlation_id); - - unsubscription_succeeded = false; - } + unsubscription_succeeded = false; + } + else if (status_code < 200 || status_code >= 300) + { + LogError("Received status code %d for TWIN unsubscription request (%s, %s)", status_code, twin_msgr->device_id, correlation_id); + + unsubscription_succeeded = false; + } - if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBING) - { - if (unsubscription_succeeded) - { - twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED; - twin_msgr->subscription_error_count = 0; - } - else - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_091: [If `message` is a failed response for a DELETE request, the TWIN messenger shall attempt to send another DELETE request] - twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE; - twin_msgr->subscription_error_count++; - } - } - } - } + if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBING) + { + if (unsubscription_succeeded) + { + twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED; + twin_msgr->subscription_error_count = 0; + } + else + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_091: [If `message` is a failed response for a DELETE request, the TWIN messenger shall attempt to send another DELETE request] + twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE; + twin_msgr->subscription_error_count++; + } + } + } + } + + destroy_twin_operation_context(twin_op_ctx); + } - destroy_twin_operation_context(twin_op_ctx); - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_092: [The corresponding TWIN request shall be removed from `twin_msgr->operations` and destroyed] + if (singlylinkedlist_remove(twin_msgr->operations, list_item) != 0) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_093: [The corresponding TWIN request failed to be removed from `twin_msgr->operations`, `twin_msgr->state` shall be set to TWIN_MESSENGER_STATE_ERROR and informed to the user] + LogError("Failed removing context for incoming TWIN message (%s, %s)", + twin_msgr->device_id, correlation_id); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_092: [The corresponding TWIN request shall be removed from `twin_msgr->operations` and destroyed] - if (singlylinkedlist_remove(twin_msgr->operations, list_item) != 0) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_093: [The corresponding TWIN request failed to be removed from `twin_msgr->operations`, `twin_msgr->state` shall be set to TWIN_MESSENGER_STATE_ERROR and informed to the user] - LogError("Failed removing context for incoming TWIN message (%s, %s, %s)", - twin_msgr->device_id, ENUM_TO_STRING(TWIN_OPERATION_TYPE, twin_op_ctx->type), correlation_id); - - update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); - } - } + update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); + } + } - free(correlation_id); - } - else if (has_twin_report) - { - // It is supposed to be a desired properties delta update. + free(correlation_id); + } + else if (has_twin_report) + { + // It is supposed to be a desired properties delta update. - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_094: [If `message` is not a client request, `on_message_received_callback` shall be invoked with TWIN_UPDATE_TYPE_PARTIAL and the message body received] - if (twin_msgr->on_message_received_callback != NULL) - { - twin_msgr->on_message_received_callback(TWIN_UPDATE_TYPE_PARTIAL, (const char*)twin_report.bytes, twin_report.length, twin_msgr->on_message_received_context); - } - } - else - { - LogError("Received TWIN message with no correlation-id and no report (%s)", twin_msgr->device_id); - } - } - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_094: [If `message` is not a client request, `on_message_received_callback` shall be invoked with TWIN_UPDATE_TYPE_PARTIAL and the message body received] + if (twin_msgr->on_message_received_callback != NULL) + { + twin_msgr->on_message_received_callback(TWIN_UPDATE_TYPE_PARTIAL, (const char*)twin_report.bytes, twin_report.length, twin_msgr->on_message_received_context); + } + } + else + { + LogError("Received TWIN message with no correlation-id and no report (%s)", twin_msgr->device_id); + } + } + } - return disposition_result; + return disposition_result; } static void on_amqp_messenger_state_changed_callback(void* context, AMQP_MESSENGER_STATE previous_state, AMQP_MESSENGER_STATE new_state) { - if (context == NULL) - { - LogError("Invalid argument (context is NULL)"); - } - else if (new_state != previous_state) - { - TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)context; + if (context == NULL) + { + LogError("Invalid argument (context is NULL)"); + } + else if (new_state != previous_state) + { + TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)context; - if (twin_msgr->state == TWIN_MESSENGER_STATE_STARTING && new_state == AMQP_MESSENGER_STATE_STARTED) - { - if (twin_msgr->amqp_msgr_is_subscribed) - { - update_state(twin_msgr, TWIN_MESSENGER_STATE_STARTED); - } - // Else, it shall wait for the moment the AMQP msgr is subscribed. - } - else if (twin_msgr->state == TWIN_MESSENGER_STATE_STOPPING && new_state == TWIN_MESSENGER_STATE_STOPPED) - { - if (!twin_msgr->amqp_msgr_is_subscribed) - { - update_state(twin_msgr, TWIN_MESSENGER_STATE_STOPPED); - } - // Else, it shall wait for the moment the AMQP msgr is unsubscribed. - } - else if ((twin_msgr->state == TWIN_MESSENGER_STATE_STARTING && new_state == AMQP_MESSENGER_STATE_STARTING) || - (twin_msgr->state == TWIN_MESSENGER_STATE_STOPPING && new_state == AMQP_MESSENGER_STATE_STOPPING)) - { - // Do nothing, this is expected. - } - else - { - LogError("Unexpected AMQP messenger state (%s, %s, %s)", - twin_msgr->device_id, ENUM_TO_STRING(TWIN_MESSENGER_STATE, twin_msgr->state), ENUM_TO_STRING(AMQP_MESSENGER_STATE, new_state)); - - update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); - } + if (twin_msgr->state == TWIN_MESSENGER_STATE_STARTING && new_state == AMQP_MESSENGER_STATE_STARTED) + { + if (twin_msgr->amqp_msgr_is_subscribed) + { + update_state(twin_msgr, TWIN_MESSENGER_STATE_STARTED); + } + // Else, it shall wait for the moment the AMQP msgr is subscribed. + } + else if (twin_msgr->state == TWIN_MESSENGER_STATE_STOPPING && new_state == TWIN_MESSENGER_STATE_STOPPED) + { + if (!twin_msgr->amqp_msgr_is_subscribed) + { + update_state(twin_msgr, TWIN_MESSENGER_STATE_STOPPED); + } + // Else, it shall wait for the moment the AMQP msgr is unsubscribed. + } + else if ((twin_msgr->state == TWIN_MESSENGER_STATE_STARTING && new_state == AMQP_MESSENGER_STATE_STARTING) || + (twin_msgr->state == TWIN_MESSENGER_STATE_STOPPING && new_state == AMQP_MESSENGER_STATE_STOPPING)) + { + // Do nothing, this is expected. + } + else + { + LogError("Unexpected AMQP messenger state (%s, %s, %s)", + twin_msgr->device_id, ENUM_TO_STRING(TWIN_MESSENGER_STATE, twin_msgr->state), ENUM_TO_STRING(AMQP_MESSENGER_STATE, new_state)); - twin_msgr->amqp_msgr_state = new_state; - } + update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); + } + + twin_msgr->amqp_msgr_state = new_state; + } } static void on_amqp_messenger_subscription_changed_callback(void* context, bool is_subscribed) { - if (context == NULL) - { - LogError("Invalid argument (context is NULL)"); - } - else - { - TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)context; + if (context == NULL) + { + LogError("Invalid argument (context is NULL)"); + } + else + { + TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)context; - if (twin_msgr->state == TWIN_MESSENGER_STATE_STARTING && is_subscribed) - { - if (twin_msgr->amqp_msgr_state == AMQP_MESSENGER_STATE_STARTED) - { - update_state(twin_msgr, TWIN_MESSENGER_STATE_STARTED); - } - // Else, it shall wait for the moment the AMQP msgr is STARTED. - } - else if (twin_msgr->state == TWIN_MESSENGER_STATE_STOPPING && !is_subscribed) - { - if (twin_msgr->amqp_msgr_state == AMQP_MESSENGER_STATE_STOPPED) - { - update_state(twin_msgr, TWIN_MESSENGER_STATE_STOPPED); - } - // Else, it shall wait for the moment the AMQP msgr is STOPPED. - } - else - { - LogError("Unexpected AMQP messenger state (%s, %s, %d)", - twin_msgr->device_id, ENUM_TO_STRING(TWIN_MESSENGER_STATE, twin_msgr->state), is_subscribed); + if (twin_msgr->state == TWIN_MESSENGER_STATE_STARTING && is_subscribed) + { + if (twin_msgr->amqp_msgr_state == AMQP_MESSENGER_STATE_STARTED) + { + update_state(twin_msgr, TWIN_MESSENGER_STATE_STARTED); + } + // Else, it shall wait for the moment the AMQP msgr is STARTED. + } + else if (twin_msgr->state == TWIN_MESSENGER_STATE_STOPPING && !is_subscribed) + { + if (twin_msgr->amqp_msgr_state == AMQP_MESSENGER_STATE_STOPPED) + { + update_state(twin_msgr, TWIN_MESSENGER_STATE_STOPPED); + } + // Else, it shall wait for the moment the AMQP msgr is STOPPED. + } + else + { + LogError("Unexpected AMQP messenger state (%s, %s, %d)", + twin_msgr->device_id, ENUM_TO_STRING(TWIN_MESSENGER_STATE, twin_msgr->state), is_subscribed); - update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); - } + update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); + } - twin_msgr->amqp_msgr_is_subscribed = is_subscribed; - } + twin_msgr->amqp_msgr_is_subscribed = is_subscribed; + } } @@ -1657,467 +1653,467 @@ TWIN_MESSENGER_HANDLE twin_messenger_create(const TWIN_MESSENGER_CONFIG* messenger_config) { - TWIN_MESSENGER_INSTANCE* twin_msgr; - - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_001: [If parameter `messenger_config` is NULL, twin_messenger_create() shall return NULL] - if (messenger_config == NULL) - { - LogError("Invalid argument (messenger_config is NULL)"); - twin_msgr = NULL; - } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_002: [If `messenger_config`'s `device_id`, `iothub_host_fqdn` or `client_version` is NULL, twin_messenger_create() shall return NULL] - else if (messenger_config->device_id == NULL || messenger_config->iothub_host_fqdn == NULL || messenger_config->client_version == NULL) - { - LogError("Invalid argument (device_id=%p, iothub_host_fqdn=%p, client_version=%p)", - messenger_config->device_id, messenger_config->iothub_host_fqdn, messenger_config->client_version); - twin_msgr = NULL; - } - else - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_003: [twin_messenger_create() shall allocate memory for the messenger instance structure (aka `twin_msgr`)] - if ((twin_msgr = (TWIN_MESSENGER_INSTANCE*)malloc(sizeof(TWIN_MESSENGER_INSTANCE))) == NULL) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_004: [If malloc() fails, twin_messenger_create() shall fail and return NULL] - LogError("Failed allocating TWIN_MESSENGER_INSTANCE (%s)", messenger_config->device_id); - } - else - { - MAP_HANDLE link_attach_properties; + TWIN_MESSENGER_INSTANCE* twin_msgr; - memset(twin_msgr, 0, sizeof(TWIN_MESSENGER_INSTANCE)); - twin_msgr->state = TWIN_MESSENGER_STATE_STOPPED; - twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED; - twin_msgr->amqp_msgr_state = AMQP_MESSENGER_STATE_STOPPED; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_001: [If parameter `messenger_config` is NULL, twin_messenger_create() shall return NULL] + if (messenger_config == NULL) + { + LogError("Invalid argument (messenger_config is NULL)"); + twin_msgr = NULL; + } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_002: [If `messenger_config`'s `device_id`, `iothub_host_fqdn` or `client_version` is NULL, twin_messenger_create() shall return NULL] + else if (messenger_config->device_id == NULL || messenger_config->iothub_host_fqdn == NULL || messenger_config->client_version == NULL) + { + LogError("Invalid argument (device_id=%p, iothub_host_fqdn=%p, client_version=%p)", + messenger_config->device_id, messenger_config->iothub_host_fqdn, messenger_config->client_version); + twin_msgr = NULL; + } + else + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_003: [twin_messenger_create() shall allocate memory for the messenger instance structure (aka `twin_msgr`)] + if ((twin_msgr = (TWIN_MESSENGER_INSTANCE*)malloc(sizeof(TWIN_MESSENGER_INSTANCE))) == NULL) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_004: [If malloc() fails, twin_messenger_create() shall fail and return NULL] + LogError("Failed allocating TWIN_MESSENGER_INSTANCE (%s)", messenger_config->device_id); + } + else + { + MAP_HANDLE link_attach_properties; + + memset(twin_msgr, 0, sizeof(TWIN_MESSENGER_INSTANCE)); + twin_msgr->state = TWIN_MESSENGER_STATE_STOPPED; + twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED; + twin_msgr->amqp_msgr_state = AMQP_MESSENGER_STATE_STOPPED; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_005: [twin_messenger_create() shall save a copy of `messenger_config` info into `twin_msgr`] - if (mallocAndStrcpy_s(&twin_msgr->client_version, messenger_config->client_version) != 0) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_006: [If any `messenger_config` info fails to be copied, twin_messenger_create() shall fail and return NULL] - LogError("Failed copying client_version (%s)", messenger_config->device_id); - internal_twin_messenger_destroy(twin_msgr); - twin_msgr = NULL; - } - else if (mallocAndStrcpy_s(&twin_msgr->device_id, messenger_config->device_id) != 0) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_006: [If any `messenger_config` info fails to be copied, twin_messenger_create() shall fail and return NULL] - LogError("Failed copying device_id (%s)", messenger_config->device_id); - internal_twin_messenger_destroy(twin_msgr); - twin_msgr = NULL; - } - else if ((messenger_config->module_id != NULL) && (mallocAndStrcpy_s(&twin_msgr->module_id, messenger_config->module_id) != 0)) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_006: [If any `messenger_config` info fails to be copied, twin_messenger_create() shall fail and return NULL] - LogError("Failed copying module_id (%s)", messenger_config->device_id); - internal_twin_messenger_destroy(twin_msgr); - twin_msgr = NULL; - } - else if (mallocAndStrcpy_s(&twin_msgr->iothub_host_fqdn, messenger_config->iothub_host_fqdn) != 0) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_006: [If any `messenger_config` info fails to be copied, twin_messenger_create() shall fail and return NULL] - LogError("Failed copying iothub_host_fqdn (%s)", messenger_config->device_id); - internal_twin_messenger_destroy(twin_msgr); - twin_msgr = NULL; - } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_007: [`twin_msgr->pending_patches` shall be set using singlylinkedlist_create()] - else if ((twin_msgr->pending_patches = singlylinkedlist_create()) == NULL) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_008: [If singlylinkedlist_create() fails, twin_messenger_create() shall fail and return NULL] - LogError("Failed creating list for queueing patches (%s)", messenger_config->device_id); - internal_twin_messenger_destroy(twin_msgr); - twin_msgr = NULL; - } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_009: [`twin_msgr->operations` shall be set using singlylinkedlist_create()] - else if ((twin_msgr->operations = singlylinkedlist_create()) == NULL) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_010: [If singlylinkedlist_create() fails, twin_messenger_create() shall fail and return NULL] - LogError("Failed creating list for operations (%s)", messenger_config->device_id); - internal_twin_messenger_destroy(twin_msgr); - twin_msgr = NULL; - } - else if ((link_attach_properties = create_link_attach_properties(twin_msgr)) == NULL) - { - LogError("Failed creating link attach properties (%s)", messenger_config->device_id); - internal_twin_messenger_destroy(twin_msgr); - twin_msgr = NULL; - } - else - { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_005: [twin_messenger_create() shall save a copy of `messenger_config` info into `twin_msgr`] + if (mallocAndStrcpy_s(&twin_msgr->client_version, messenger_config->client_version) != 0) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_006: [If any `messenger_config` info fails to be copied, twin_messenger_create() shall fail and return NULL] + LogError("Failed copying client_version (%s)", messenger_config->device_id); + internal_twin_messenger_destroy(twin_msgr); + twin_msgr = NULL; + } + else if (mallocAndStrcpy_s(&twin_msgr->device_id, messenger_config->device_id) != 0) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_006: [If any `messenger_config` info fails to be copied, twin_messenger_create() shall fail and return NULL] + LogError("Failed copying device_id (%s)", messenger_config->device_id); + internal_twin_messenger_destroy(twin_msgr); + twin_msgr = NULL; + } + else if ((messenger_config->module_id != NULL) && (mallocAndStrcpy_s(&twin_msgr->module_id, messenger_config->module_id) != 0)) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_006: [If any `messenger_config` info fails to be copied, twin_messenger_create() shall fail and return NULL] + LogError("Failed copying module_id (%s)", messenger_config->device_id); + internal_twin_messenger_destroy(twin_msgr); + twin_msgr = NULL; + } + else if (mallocAndStrcpy_s(&twin_msgr->iothub_host_fqdn, messenger_config->iothub_host_fqdn) != 0) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_006: [If any `messenger_config` info fails to be copied, twin_messenger_create() shall fail and return NULL] + LogError("Failed copying iothub_host_fqdn (%s)", messenger_config->device_id); + internal_twin_messenger_destroy(twin_msgr); + twin_msgr = NULL; + } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_007: [`twin_msgr->pending_patches` shall be set using singlylinkedlist_create()] + else if ((twin_msgr->pending_patches = singlylinkedlist_create()) == NULL) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_008: [If singlylinkedlist_create() fails, twin_messenger_create() shall fail and return NULL] + LogError("Failed creating list for queueing patches (%s)", messenger_config->device_id); + internal_twin_messenger_destroy(twin_msgr); + twin_msgr = NULL; + } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_009: [`twin_msgr->operations` shall be set using singlylinkedlist_create()] + else if ((twin_msgr->operations = singlylinkedlist_create()) == NULL) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_010: [If singlylinkedlist_create() fails, twin_messenger_create() shall fail and return NULL] + LogError("Failed creating list for operations (%s)", messenger_config->device_id); + internal_twin_messenger_destroy(twin_msgr); + twin_msgr = NULL; + } + else if ((link_attach_properties = create_link_attach_properties(twin_msgr)) == NULL) + { + LogError("Failed creating link attach properties (%s)", messenger_config->device_id); + internal_twin_messenger_destroy(twin_msgr); + twin_msgr = NULL; + } + else + { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_012: [`amqp_msgr_config->client_version` shall be set with `twin_msgr->client_version`] - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_013: [`amqp_msgr_config->device_id` shall be set with `twin_msgr->device_id`] - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_014: [`amqp_msgr_config->iothub_host_fqdn` shall be set with `twin_msgr->iothub_host_fqdn`] - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_015: [`amqp_msgr_config` shall have "twin/" as send link target suffix and receive link source suffix] - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_016: [`amqp_msgr_config` shall have send and receive link attach properties set as "com.microsoft:client-version" = `twin_msgr->client_version`, "com.microsoft:channel-correlation-id" = `twin:<UUID>`, "com.microsoft:api-version" = "2016-11-14"] - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_017: [`amqp_msgr_config` shall be set with `on_amqp_messenger_state_changed_callback` and `on_amqp_messenger_subscription_changed_callback` callbacks] - AMQP_MESSENGER_CONFIG amqp_msgr_config; - amqp_msgr_config.client_version = twin_msgr->client_version; - amqp_msgr_config.device_id = twin_msgr->device_id; - amqp_msgr_config.module_id = twin_msgr->module_id; - amqp_msgr_config.iothub_host_fqdn = twin_msgr->iothub_host_fqdn; - amqp_msgr_config.send_link.target_suffix = DEFAULT_TWIN_SEND_LINK_SOURCE_NAME; - amqp_msgr_config.send_link.attach_properties = link_attach_properties; - amqp_msgr_config.send_link.snd_settle_mode = sender_settle_mode_settled; - amqp_msgr_config.send_link.rcv_settle_mode = receiver_settle_mode_first; - amqp_msgr_config.receive_link.source_suffix = DEFAULT_TWIN_RECEIVE_LINK_TARGET_NAME; - amqp_msgr_config.receive_link.attach_properties = link_attach_properties; - amqp_msgr_config.receive_link.snd_settle_mode = sender_settle_mode_settled; - amqp_msgr_config.receive_link.rcv_settle_mode = receiver_settle_mode_first; - amqp_msgr_config.on_state_changed_callback = on_amqp_messenger_state_changed_callback; - amqp_msgr_config.on_state_changed_context = (void*)twin_msgr; - amqp_msgr_config.on_subscription_changed_callback = on_amqp_messenger_subscription_changed_callback; - amqp_msgr_config.on_subscription_changed_context = (void*)twin_msgr; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_012: [`amqp_msgr_config->client_version` shall be set with `twin_msgr->client_version`] + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_013: [`amqp_msgr_config->device_id` shall be set with `twin_msgr->device_id`] + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_014: [`amqp_msgr_config->iothub_host_fqdn` shall be set with `twin_msgr->iothub_host_fqdn`] + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_015: [`amqp_msgr_config` shall have "twin/" as send link target suffix and receive link source suffix] + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_016: [`amqp_msgr_config` shall have send and receive link attach properties set as "com.microsoft:client-version" = `twin_msgr->client_version`, "com.microsoft:channel-correlation-id" = `twin:<UUID>`, "com.microsoft:api-version" = "2016-11-14"] + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_017: [`amqp_msgr_config` shall be set with `on_amqp_messenger_state_changed_callback` and `on_amqp_messenger_subscription_changed_callback` callbacks] + AMQP_MESSENGER_CONFIG amqp_msgr_config; + amqp_msgr_config.client_version = twin_msgr->client_version; + amqp_msgr_config.device_id = twin_msgr->device_id; + amqp_msgr_config.module_id = twin_msgr->module_id; + amqp_msgr_config.iothub_host_fqdn = twin_msgr->iothub_host_fqdn; + amqp_msgr_config.send_link.target_suffix = DEFAULT_TWIN_SEND_LINK_SOURCE_NAME; + amqp_msgr_config.send_link.attach_properties = link_attach_properties; + amqp_msgr_config.send_link.snd_settle_mode = sender_settle_mode_settled; + amqp_msgr_config.send_link.rcv_settle_mode = receiver_settle_mode_first; + amqp_msgr_config.receive_link.source_suffix = DEFAULT_TWIN_RECEIVE_LINK_TARGET_NAME; + amqp_msgr_config.receive_link.attach_properties = link_attach_properties; + amqp_msgr_config.receive_link.snd_settle_mode = sender_settle_mode_settled; + amqp_msgr_config.receive_link.rcv_settle_mode = receiver_settle_mode_first; + amqp_msgr_config.on_state_changed_callback = on_amqp_messenger_state_changed_callback; + amqp_msgr_config.on_state_changed_context = (void*)twin_msgr; + amqp_msgr_config.on_subscription_changed_callback = on_amqp_messenger_subscription_changed_callback; + amqp_msgr_config.on_subscription_changed_context = (void*)twin_msgr; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_011: [`twin_msgr->amqp_msgr` shall be set using amqp_messenger_create(), passing a AMQP_MESSENGER_CONFIG instance `amqp_msgr_config`] - if ((twin_msgr->amqp_msgr = amqp_messenger_create(&amqp_msgr_config)) == NULL) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_018: [If amqp_messenger_create() fails, twin_messenger_create() shall fail and return NULL] - LogError("Failed creating the AMQP messenger (%s)", messenger_config->device_id); - internal_twin_messenger_destroy(twin_msgr); - twin_msgr = NULL; - } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_019: [`twin_msgr->amqp_msgr` shall subscribe for AMQP messages by calling amqp_messenger_subscribe_for_messages() passing `on_amqp_message_received`] - else if (amqp_messenger_subscribe_for_messages(twin_msgr->amqp_msgr, on_amqp_message_received_callback, (void*)twin_msgr)) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_020: [If amqp_messenger_subscribe_for_messages() fails, twin_messenger_create() shall fail and return NULL] - LogError("Failed subscribing for AMQP messages (%s)", messenger_config->device_id); - internal_twin_messenger_destroy(twin_msgr); - twin_msgr = NULL; - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_TWIN_MESSENGER_09_013: [`messenger_config->on_state_changed_callback` shall be saved into `twin_msgr->on_state_changed_callback`] - twin_msgr->on_state_changed_callback = messenger_config->on_state_changed_callback; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_011: [`twin_msgr->amqp_msgr` shall be set using amqp_messenger_create(), passing a AMQP_MESSENGER_CONFIG instance `amqp_msgr_config`] + if ((twin_msgr->amqp_msgr = amqp_messenger_create(&amqp_msgr_config)) == NULL) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_018: [If amqp_messenger_create() fails, twin_messenger_create() shall fail and return NULL] + LogError("Failed creating the AMQP messenger (%s)", messenger_config->device_id); + internal_twin_messenger_destroy(twin_msgr); + twin_msgr = NULL; + } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_019: [`twin_msgr->amqp_msgr` shall subscribe for AMQP messages by calling amqp_messenger_subscribe_for_messages() passing `on_amqp_message_received`] + else if (amqp_messenger_subscribe_for_messages(twin_msgr->amqp_msgr, on_amqp_message_received_callback, (void*)twin_msgr)) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_020: [If amqp_messenger_subscribe_for_messages() fails, twin_messenger_create() shall fail and return NULL] + LogError("Failed subscribing for AMQP messages (%s)", messenger_config->device_id); + internal_twin_messenger_destroy(twin_msgr); + twin_msgr = NULL; + } + else + { + // Codes_SRS_IOTHUBTRANSPORT_TWIN_MESSENGER_09_013: [`messenger_config->on_state_changed_callback` shall be saved into `twin_msgr->on_state_changed_callback`] + twin_msgr->on_state_changed_callback = messenger_config->on_state_changed_callback; - // Codes_SRS_IOTHUBTRANSPORT_TWIN_MESSENGER_09_014: [`messenger_config->on_state_changed_context` shall be saved into `twin_msgr->on_state_changed_context`] - twin_msgr->on_state_changed_context = messenger_config->on_state_changed_context; - } + // Codes_SRS_IOTHUBTRANSPORT_TWIN_MESSENGER_09_014: [`messenger_config->on_state_changed_context` shall be saved into `twin_msgr->on_state_changed_context`] + twin_msgr->on_state_changed_context = messenger_config->on_state_changed_context; + } - destroy_link_attach_properties(link_attach_properties); - } - } - } + destroy_link_attach_properties(link_attach_properties); + } + } + } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_021: [If no failures occurr, twin_messenger_create() shall return a handle to `twin_msgr`] - return (TWIN_MESSENGER_HANDLE)twin_msgr; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_021: [If no failures occurr, twin_messenger_create() shall return a handle to `twin_msgr`] + return (TWIN_MESSENGER_HANDLE)twin_msgr; } int twin_messenger_report_state_async(TWIN_MESSENGER_HANDLE twin_msgr_handle, CONSTBUFFER_HANDLE data, TWIN_MESSENGER_REPORT_STATE_COMPLETE_CALLBACK on_report_state_complete_callback, const void* context) { - int result; + int result; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_022: [If `twin_msgr_handle` or `data` are NULL, twin_messenger_report_state_async() shall fail and return a non-zero value] - if (twin_msgr_handle == NULL || data == NULL) - { - LogError("Invalid argument (twin_msgr_handle=%p, data=%p)", twin_msgr_handle, data); - result = __FAILURE__; - } - else - { - TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; - TWIN_PATCH_OPERATION_CONTEXT* twin_patch_ctx; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_022: [If `twin_msgr_handle` or `data` are NULL, twin_messenger_report_state_async() shall fail and return a non-zero value] + if (twin_msgr_handle == NULL || data == NULL) + { + LogError("Invalid argument (twin_msgr_handle=%p, data=%p)", twin_msgr_handle, data); + result = __FAILURE__; + } + else + { + TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; + TWIN_PATCH_OPERATION_CONTEXT* twin_patch_ctx; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_023: [twin_messenger_report_state_async() shall allocate memory for a TWIN_PATCH_OPERATION_CONTEXT structure (aka `twin_op_ctx`)] - if ((twin_patch_ctx = (TWIN_PATCH_OPERATION_CONTEXT*)malloc(sizeof(TWIN_PATCH_OPERATION_CONTEXT))) == NULL) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_024: [If malloc() fails, twin_messenger_report_state_async() shall fail and return a non-zero value] - LogError("Failed creating context for sending reported state (%s)", twin_msgr->device_id); - result = __FAILURE__; - } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_025: [`twin_op_ctx` shall have a copy of `data`] - else if ((twin_patch_ctx->data = CONSTBUFFER_Clone(data)) == NULL) - { - LogError("Failed cloning TWIN patch request data (%s)", twin_msgr->device_id); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_031: [If any failure occurs, twin_messenger_report_state_async() shall free any memory it has allocated] - free(twin_patch_ctx); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_026: [If `data` fails to be copied, twin_messenger_report_state_async() shall fail and return a non-zero value] - result = __FAILURE__; - } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_027: [`twin_op_ctx->time_enqueued` shall be set using get_time] - else if ((twin_patch_ctx->time_enqueued = get_time(NULL)) == INDEFINITE_TIME) - { - LogError("Failed setting reported state enqueue time (%s)", twin_msgr->device_id); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_031: [If any failure occurs, twin_messenger_report_state_async() shall free any memory it has allocated] - CONSTBUFFER_Destroy(twin_patch_ctx->data); - free(twin_patch_ctx); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_028: [If `twin_op_ctx->time_enqueued` fails to be set, twin_messenger_report_state_async() shall fail and return a non-zero value] - result = __FAILURE__; - } - else - { - twin_patch_ctx->on_report_state_complete_callback = on_report_state_complete_callback; - twin_patch_ctx->on_report_state_complete_context = context; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_023: [twin_messenger_report_state_async() shall allocate memory for a TWIN_PATCH_OPERATION_CONTEXT structure (aka `twin_op_ctx`)] + if ((twin_patch_ctx = (TWIN_PATCH_OPERATION_CONTEXT*)malloc(sizeof(TWIN_PATCH_OPERATION_CONTEXT))) == NULL) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_024: [If malloc() fails, twin_messenger_report_state_async() shall fail and return a non-zero value] + LogError("Failed creating context for sending reported state (%s)", twin_msgr->device_id); + result = __FAILURE__; + } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_025: [`twin_op_ctx` shall have a copy of `data`] + else if ((twin_patch_ctx->data = CONSTBUFFER_Clone(data)) == NULL) + { + LogError("Failed cloning TWIN patch request data (%s)", twin_msgr->device_id); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_031: [If any failure occurs, twin_messenger_report_state_async() shall free any memory it has allocated] + free(twin_patch_ctx); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_026: [If `data` fails to be copied, twin_messenger_report_state_async() shall fail and return a non-zero value] + result = __FAILURE__; + } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_027: [`twin_op_ctx->time_enqueued` shall be set using get_time] + else if ((twin_patch_ctx->time_enqueued = get_time(NULL)) == INDEFINITE_TIME) + { + LogError("Failed setting reported state enqueue time (%s)", twin_msgr->device_id); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_031: [If any failure occurs, twin_messenger_report_state_async() shall free any memory it has allocated] + CONSTBUFFER_Destroy(twin_patch_ctx->data); + free(twin_patch_ctx); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_028: [If `twin_op_ctx->time_enqueued` fails to be set, twin_messenger_report_state_async() shall fail and return a non-zero value] + result = __FAILURE__; + } + else + { + twin_patch_ctx->on_report_state_complete_callback = on_report_state_complete_callback; + twin_patch_ctx->on_report_state_complete_context = context; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_029: [`twin_op_ctx` shall be added to `twin_msgr->pending_patches` using singlylinkedlist_add()] - if (singlylinkedlist_add(twin_msgr->pending_patches, twin_patch_ctx) == NULL) - { - LogError("Failed adding TWIN patch request to queue (%s)", twin_msgr->device_id); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_031: [If any failure occurs, twin_messenger_report_state_async() shall free any memory it has allocated] - CONSTBUFFER_Destroy(twin_patch_ctx->data); - free(twin_patch_ctx); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_030: [If singlylinkedlist_add() fails, twin_messenger_report_state_async() shall fail and return a non-zero value] - result = __FAILURE__; - } - else - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_032: [If no failures occur, twin_messenger_report_state_async() shall return zero] - result = RESULT_OK; - } - } - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_029: [`twin_op_ctx` shall be added to `twin_msgr->pending_patches` using singlylinkedlist_add()] + if (singlylinkedlist_add(twin_msgr->pending_patches, twin_patch_ctx) == NULL) + { + LogError("Failed adding TWIN patch request to queue (%s)", twin_msgr->device_id); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_031: [If any failure occurs, twin_messenger_report_state_async() shall free any memory it has allocated] + CONSTBUFFER_Destroy(twin_patch_ctx->data); + free(twin_patch_ctx); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_030: [If singlylinkedlist_add() fails, twin_messenger_report_state_async() shall fail and return a non-zero value] + result = __FAILURE__; + } + else + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_032: [If no failures occur, twin_messenger_report_state_async() shall return zero] + result = RESULT_OK; + } + } + } - return result; + return result; } int twin_messenger_subscribe(TWIN_MESSENGER_HANDLE twin_msgr_handle, TWIN_STATE_UPDATE_CALLBACK on_twin_state_update_callback, void* context) { - int result; + int result; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_037: [If `twin_msgr_handle` or `on_twin_state_update_callback` are NULL, twin_messenger_subscribe() shall fail and return a non-zero value] - if (twin_msgr_handle == NULL || on_twin_state_update_callback == NULL) - { - LogError("Invalid argument (twin_msgr_handle=%p, on_twin_state_update_callback=%p)", twin_msgr_handle, on_twin_state_update_callback); - result = __FAILURE__; - } - else - { - TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_037: [If `twin_msgr_handle` or `on_twin_state_update_callback` are NULL, twin_messenger_subscribe() shall fail and return a non-zero value] + if (twin_msgr_handle == NULL || on_twin_state_update_callback == NULL) + { + LogError("Invalid argument (twin_msgr_handle=%p, on_twin_state_update_callback=%p)", twin_msgr_handle, on_twin_state_update_callback); + result = __FAILURE__; + } + else + { + TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_038: [If `twin_msgr` is already subscribed, twin_messenger_subscribe() shall return zero] - if (twin_msgr->subscription_state != TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED) - { - result = RESULT_OK; - } - else - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_039: [`on_twin_state_update_callback` and `context` shall be saved on `twin_msgr`] - twin_msgr->on_message_received_callback = on_twin_state_update_callback; - twin_msgr->on_message_received_context = context; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_040: [twin_messenger_subscribe() shall change `twin_msgr->subscription_state` to TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES] - twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_041: [If no failures occurr, twin_messenger_subscribe() shall return 0] - result = RESULT_OK; - } - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_038: [If `twin_msgr` is already subscribed, twin_messenger_subscribe() shall return zero] + if (twin_msgr->subscription_state != TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED) + { + result = RESULT_OK; + } + else + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_039: [`on_twin_state_update_callback` and `context` shall be saved on `twin_msgr`] + twin_msgr->on_message_received_callback = on_twin_state_update_callback; + twin_msgr->on_message_received_context = context; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_040: [twin_messenger_subscribe() shall change `twin_msgr->subscription_state` to TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES] + twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_041: [If no failures occurr, twin_messenger_subscribe() shall return 0] + result = RESULT_OK; + } + } - return result; + return result; } int twin_messenger_unsubscribe(TWIN_MESSENGER_HANDLE twin_msgr_handle) { - int result; + int result; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_042: [If `twin_msgr_handle` is NULL, twin_messenger_unsubscribe() shall fail and return a non-zero value] - if (twin_msgr_handle == NULL) - { - LogError("Invalid argument (twin_msgr_handle is NULL)"); - result = __FAILURE__; - } - else - { - TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_043: [twin_messenger_subscribe() shall change `twin_msgr->subscription_state` to TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE] - twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE; - twin_msgr->on_message_received_callback = NULL; - twin_msgr->on_message_received_context = NULL; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_044: [If no failures occurr, twin_messenger_unsubscribe() shall return zero] - result = RESULT_OK; - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_042: [If `twin_msgr_handle` is NULL, twin_messenger_unsubscribe() shall fail and return a non-zero value] + if (twin_msgr_handle == NULL) + { + LogError("Invalid argument (twin_msgr_handle is NULL)"); + result = __FAILURE__; + } + else + { + TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_043: [twin_messenger_subscribe() shall change `twin_msgr->subscription_state` to TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE] + twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE; + twin_msgr->on_message_received_callback = NULL; + twin_msgr->on_message_received_context = NULL; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_044: [If no failures occurr, twin_messenger_unsubscribe() shall return zero] + result = RESULT_OK; + } - return result; + return result; } int twin_messenger_get_send_status(TWIN_MESSENGER_HANDLE twin_msgr_handle, TWIN_MESSENGER_SEND_STATUS* send_status) { - int result; + int result; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_033: [If `twin_msgr_handle` or `send_status` are NULL, twin_messenger_get_send_status() shall fail and return a non-zero value] - if (twin_msgr_handle == NULL || send_status == NULL) - { - LogError("Invalid argument (twin_msgr_handle=%p, send_status=%p)", twin_msgr_handle, send_status); - result = __FAILURE__; - } - else - { - TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; - TWIN_OPERATION_TYPE twin_op_type = TWIN_OPERATION_TYPE_PATCH; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_033: [If `twin_msgr_handle` or `send_status` are NULL, twin_messenger_get_send_status() shall fail and return a non-zero value] + if (twin_msgr_handle == NULL || send_status == NULL) + { + LogError("Invalid argument (twin_msgr_handle=%p, send_status=%p)", twin_msgr_handle, send_status); + result = __FAILURE__; + } + else + { + TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; + TWIN_OPERATION_TYPE twin_op_type = TWIN_OPERATION_TYPE_PATCH; - if (singlylinkedlist_get_head_item(twin_msgr->pending_patches) != NULL || - singlylinkedlist_find(twin_msgr->operations, find_twin_operation_by_type, &twin_op_type)) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_034: [If `twin_msgr->pending_patches` or `twin_msgr->operations` have any TWIN patch requests, send_status shall be set to TWIN_MESSENGER_SEND_STATUS_BUSY] - *send_status = TWIN_MESSENGER_SEND_STATUS_BUSY; - } - else - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_035: [Otherwise, send_status shall be set to TWIN_MESSENGER_SEND_STATUS_IDLE] - *send_status = TWIN_MESSENGER_SEND_STATUS_IDLE; - } + if (singlylinkedlist_get_head_item(twin_msgr->pending_patches) != NULL || + singlylinkedlist_find(twin_msgr->operations, find_twin_operation_by_type, &twin_op_type)) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_034: [If `twin_msgr->pending_patches` or `twin_msgr->operations` have any TWIN patch requests, send_status shall be set to TWIN_MESSENGER_SEND_STATUS_BUSY] + *send_status = TWIN_MESSENGER_SEND_STATUS_BUSY; + } + else + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_035: [Otherwise, send_status shall be set to TWIN_MESSENGER_SEND_STATUS_IDLE] + *send_status = TWIN_MESSENGER_SEND_STATUS_IDLE; + } - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_036: [If no failures occur, twin_messenger_get_send_status() shall return 0] - result = RESULT_OK; - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_036: [If no failures occur, twin_messenger_get_send_status() shall return 0] + result = RESULT_OK; + } - return result; + return result; } int twin_messenger_start(TWIN_MESSENGER_HANDLE twin_msgr_handle, SESSION_HANDLE session_handle) { - int result; + int result; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_045: [If `twin_msgr_handle` or `session_handle` are NULL, twin_messenger_start() shall fail and return a non-zero value] - if (twin_msgr_handle == NULL || session_handle == NULL) - { - LogError("Invalid argument (twin_msgr_handle=%p, session_handle=%p)", twin_msgr_handle, session_handle); - result = __FAILURE__; - } - else - { - TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_045: [If `twin_msgr_handle` or `session_handle` are NULL, twin_messenger_start() shall fail and return a non-zero value] + if (twin_msgr_handle == NULL || session_handle == NULL) + { + LogError("Invalid argument (twin_msgr_handle=%p, session_handle=%p)", twin_msgr_handle, session_handle); + result = __FAILURE__; + } + else + { + TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_048: [If no failures occurr, `twin_msgr->state` shall be set to TWIN_MESSENGER_STATE_STARTING, and `twin_msgr->on_state_changed_callback` invoked if provided] - update_state(twin_msgr, TWIN_MESSENGER_STATE_STARTING); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_048: [If no failures occurr, `twin_msgr->state` shall be set to TWIN_MESSENGER_STATE_STARTING, and `twin_msgr->on_state_changed_callback` invoked if provided] + update_state(twin_msgr, TWIN_MESSENGER_STATE_STARTING); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_046: [amqp_messenger_start() shall be invoked passing `twin_msgr->amqp_msgr` and `session_handle`] - if (amqp_messenger_start(twin_msgr->amqp_msgr, session_handle) != 0) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_047: [If amqp_messenger_start() fails, twin_messenger_start() fail and return a non-zero value] - LogError("Failed starting the AMQP messenger (%s)", twin_msgr->device_id); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_049: [If any failures occurr, `twin_msgr->state` shall be set to TWIN_MESSENGER_STATE_ERROR, and `twin_msgr->on_state_changed_callback` invoked if provided] - update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); - result = __FAILURE__; - } - else - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_050: [If no failures occurr, twin_messenger_start() shall return 0] - result = RESULT_OK; - } - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_046: [amqp_messenger_start() shall be invoked passing `twin_msgr->amqp_msgr` and `session_handle`] + if (amqp_messenger_start(twin_msgr->amqp_msgr, session_handle) != 0) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_047: [If amqp_messenger_start() fails, twin_messenger_start() fail and return a non-zero value] + LogError("Failed starting the AMQP messenger (%s)", twin_msgr->device_id); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_049: [If any failures occurr, `twin_msgr->state` shall be set to TWIN_MESSENGER_STATE_ERROR, and `twin_msgr->on_state_changed_callback` invoked if provided] + update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); + result = __FAILURE__; + } + else + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_050: [If no failures occurr, twin_messenger_start() shall return 0] + result = RESULT_OK; + } + } - return result; + return result; } int twin_messenger_stop(TWIN_MESSENGER_HANDLE twin_msgr_handle) { - int result; + int result; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_051: [If `twin_msgr_handle` is NULL, twin_messenger_stop() shall fail and return a non-zero value] - if (twin_msgr_handle == NULL) - { - LogError("Invalid argument (twin_msgr_handle is NULL)"); - result = __FAILURE__; - } - else - { - TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_051: [If `twin_msgr_handle` is NULL, twin_messenger_stop() shall fail and return a non-zero value] + if (twin_msgr_handle == NULL) + { + LogError("Invalid argument (twin_msgr_handle is NULL)"); + result = __FAILURE__; + } + else + { + TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_054: [`twin_msgr->state` shall be set to TWIN_MESSENGER_STATE_STOPPING, and `twin_msgr->on_state_changed_callback` invoked if provided] - update_state(twin_msgr, TWIN_MESSENGER_STATE_STOPPING); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_054: [`twin_msgr->state` shall be set to TWIN_MESSENGER_STATE_STOPPING, and `twin_msgr->on_state_changed_callback` invoked if provided] + update_state(twin_msgr, TWIN_MESSENGER_STATE_STOPPING); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_052: [amqp_messenger_stop() shall be invoked passing `twin_msgr->amqp_msgr`] - if (amqp_messenger_stop(twin_msgr->amqp_msgr) != 0) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_053: [If amqp_messenger_stop() fails, twin_messenger_stop() fail and return a non-zero value] - LogError("Failed stopping the AMQP messenger (%s)", twin_msgr->device_id); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_055: [If any failures occurr, `twin_msgr->state` shall be set to TWIN_MESSENGER_STATE_ERROR, and `twin_msgr->on_state_changed_callback` invoked if provided] - update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); - result = __FAILURE__; - } - else - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_056: [If no failures occurr, twin_messenger_stop() shall return 0] - result = RESULT_OK; - } - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_052: [amqp_messenger_stop() shall be invoked passing `twin_msgr->amqp_msgr`] + if (amqp_messenger_stop(twin_msgr->amqp_msgr) != 0) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_053: [If amqp_messenger_stop() fails, twin_messenger_stop() fail and return a non-zero value] + LogError("Failed stopping the AMQP messenger (%s)", twin_msgr->device_id); + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_055: [If any failures occurr, `twin_msgr->state` shall be set to TWIN_MESSENGER_STATE_ERROR, and `twin_msgr->on_state_changed_callback` invoked if provided] + update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR); + result = __FAILURE__; + } + else + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_056: [If no failures occurr, twin_messenger_stop() shall return 0] + result = RESULT_OK; + } + } - return result; + return result; } void twin_messenger_do_work(TWIN_MESSENGER_HANDLE twin_msgr_handle) { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_057: [If `twin_msgr_handle` is NULL, twin_messenger_do_work() shall return immediately] - if (twin_msgr_handle != NULL) - { - TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_057: [If `twin_msgr_handle` is NULL, twin_messenger_do_work() shall return immediately] + if (twin_msgr_handle != NULL) + { + TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; - if (twin_msgr->state == TWIN_MESSENGER_STATE_STARTED) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_058: [If `twin_msgr->state` is TWIN_MESSENGER_STATE_STARTED, twin_messenger_do_work() shall send the PATCHES in `twin_msgr->pending_patches`, removing them from the list] - (void)singlylinkedlist_remove_if(twin_msgr->pending_patches, send_pending_twin_patch, (const void*)twin_msgr); + if (twin_msgr->state == TWIN_MESSENGER_STATE_STARTED) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_058: [If `twin_msgr->state` is TWIN_MESSENGER_STATE_STARTED, twin_messenger_do_work() shall send the PATCHES in `twin_msgr->pending_patches`, removing them from the list] + (void)singlylinkedlist_remove_if(twin_msgr->pending_patches, send_pending_twin_patch, (const void*)twin_msgr); - process_twin_subscription(twin_msgr); - } + process_twin_subscription(twin_msgr); + } - process_timeouts(twin_msgr); + process_timeouts(twin_msgr); - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_083: [twin_messenger_do_work() shall invoke amqp_messenger_do_work() passing `twin_msgr->amqp_msgr`] - amqp_messenger_do_work(twin_msgr->amqp_msgr); - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_083: [twin_messenger_do_work() shall invoke amqp_messenger_do_work() passing `twin_msgr->amqp_msgr`] + amqp_messenger_do_work(twin_msgr->amqp_msgr); + } } void twin_messenger_destroy(TWIN_MESSENGER_HANDLE twin_msgr_handle) { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_098: [If `twin_msgr_handle` is NULL, twin_messenger_destroy() shall return immediately] - if (twin_msgr_handle == NULL) - { - LogError("Invalid argument (twin_msgr_handle is NULL)"); - } - else - { - internal_twin_messenger_destroy((TWIN_MESSENGER_INSTANCE*)twin_msgr_handle); - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_098: [If `twin_msgr_handle` is NULL, twin_messenger_destroy() shall return immediately] + if (twin_msgr_handle == NULL) + { + LogError("Invalid argument (twin_msgr_handle is NULL)"); + } + else + { + internal_twin_messenger_destroy((TWIN_MESSENGER_INSTANCE*)twin_msgr_handle); + } } int twin_messenger_set_option(TWIN_MESSENGER_HANDLE twin_msgr_handle, const char* name, void* value) { - int result; + int result; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_103: [If `twin_msgr_handle` or `name` or `value` are NULL, twin_messenger_set_option() shall fail and return a non-zero value] - if (twin_msgr_handle == NULL || name == NULL || value == NULL) - { - LogError("Invalid argument (twin_msgr_handle=%p, name=%p, value=%p)", twin_msgr_handle, name, value); - result = __FAILURE__; - } - else - { - TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_103: [If `twin_msgr_handle` or `name` or `value` are NULL, twin_messenger_set_option() shall fail and return a non-zero value] + if (twin_msgr_handle == NULL || name == NULL || value == NULL) + { + LogError("Invalid argument (twin_msgr_handle=%p, name=%p, value=%p)", twin_msgr_handle, name, value); + result = __FAILURE__; + } + else + { + TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_104: [amqp_messenger_set_option() shall be invoked passing `name` and `option`] - if (amqp_messenger_set_option(twin_msgr->amqp_msgr, name, value) != RESULT_OK) - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_105: [If amqp_messenger_set_option() fails, twin_messenger_set_option() shall fail and return a non-zero value] - LogError("Failed setting TWIN messenger option (%s, %s)", twin_msgr->device_id, name); - result = __FAILURE__; - } - else - { - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_106: [If no errors occur, twin_messenger_set_option shall return zero] - result = RESULT_OK; - } - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_104: [amqp_messenger_set_option() shall be invoked passing `name` and `option`] + if (amqp_messenger_set_option(twin_msgr->amqp_msgr, name, value) != RESULT_OK) + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_105: [If amqp_messenger_set_option() fails, twin_messenger_set_option() shall fail and return a non-zero value] + LogError("Failed setting TWIN messenger option (%s, %s)", twin_msgr->device_id, name); + result = __FAILURE__; + } + else + { + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_106: [If no errors occur, twin_messenger_set_option shall return zero] + result = RESULT_OK; + } + } - return result; + return result; } OPTIONHANDLER_HANDLE twin_messenger_retrieve_options(TWIN_MESSENGER_HANDLE twin_msgr_handle) { - OPTIONHANDLER_HANDLE result; + OPTIONHANDLER_HANDLE result; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_107: [If `twin_msgr_handle` is NULL, twin_messenger_retrieve_options shall fail and return NULL] - if (twin_msgr_handle == NULL) - { - LogError("Invalid argument (twin_msgr_handle is NULL)"); - result = NULL; - } - else - { - TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_107: [If `twin_msgr_handle` is NULL, twin_messenger_retrieve_options shall fail and return NULL] + if (twin_msgr_handle == NULL) + { + LogError("Invalid argument (twin_msgr_handle is NULL)"); + result = NULL; + } + else + { + TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)twin_msgr_handle; - // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_108: [twin_messenger_retrieve_options() shall return the result of amqp_messenger_retrieve_options()] - if ((result = amqp_messenger_retrieve_options(twin_msgr->amqp_msgr)) == NULL) - { - LogError("Failed TWIN messenger options (%s)", twin_msgr->device_id); - } - } + // Codes_IOTHUBTRANSPORT_AMQP_TWIN_MESSENGER_09_108: [twin_messenger_retrieve_options() shall return the result of amqp_messenger_retrieve_options()] + if ((result = amqp_messenger_retrieve_options(twin_msgr->amqp_msgr)) == NULL) + { + LogError("Failed TWIN messenger options (%s)", twin_msgr->device_id); + } + } - return result; + return result; }