A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: link.c
- Revision:
- 28:add19eb7defa
- Parent:
- 27:d74f1cea23e1
- Child:
- 29:4a11413cf217
--- a/link.c Fri Jun 02 15:53:07 2017 -0700 +++ b/link.c Fri Jun 30 10:41:22 2017 -0700 @@ -19,36 +19,36 @@ typedef struct DELIVERY_INSTANCE_TAG { - delivery_number delivery_id; - ON_DELIVERY_SETTLED on_delivery_settled; - void* callback_context; - void* link; + delivery_number delivery_id; + ON_DELIVERY_SETTLED on_delivery_settled; + void* callback_context; + void* link; } DELIVERY_INSTANCE; typedef struct LINK_INSTANCE_TAG { - SESSION_HANDLE session; - LINK_STATE link_state; - LINK_STATE previous_link_state; - AMQP_VALUE source; - AMQP_VALUE target; - handle handle; - LINK_ENDPOINT_HANDLE link_endpoint; - char* name; - SINGLYLINKEDLIST_HANDLE pending_deliveries; - sequence_no delivery_count; - role role; - ON_LINK_STATE_CHANGED on_link_state_changed; - ON_LINK_FLOW_ON on_link_flow_on; + SESSION_HANDLE session; + LINK_STATE link_state; + LINK_STATE previous_link_state; + AMQP_VALUE source; + AMQP_VALUE target; + handle handle; + LINK_ENDPOINT_HANDLE link_endpoint; + char* name; + SINGLYLINKEDLIST_HANDLE pending_deliveries; + sequence_no delivery_count; + role role; + ON_LINK_STATE_CHANGED on_link_state_changed; + ON_LINK_FLOW_ON on_link_flow_on; ON_TRANSFER_RECEIVED on_transfer_received; - void* callback_context; - sender_settle_mode snd_settle_mode; - receiver_settle_mode rcv_settle_mode; - sequence_no initial_delivery_count; - uint64_t max_message_size; - uint64_t peer_max_message_size; - uint32_t link_credit; - uint32_t available; + void* callback_context; + sender_settle_mode snd_settle_mode; + receiver_settle_mode rcv_settle_mode; + sequence_no initial_delivery_count; + uint64_t max_message_size; + uint64_t peer_max_message_size; + uint32_t link_credit; + uint32_t available; fields attach_properties; bool is_underlying_session_begun; bool is_closed; @@ -59,13 +59,13 @@ static void set_link_state(LINK_INSTANCE* link_instance, LINK_STATE link_state) { - link_instance->previous_link_state = link_instance->link_state; - link_instance->link_state = link_state; + link_instance->previous_link_state = link_instance->link_state; + link_instance->link_state = link_state; - if (link_instance->on_link_state_changed != NULL) - { - link_instance->on_link_state_changed(link_instance->callback_context, link_state, link_instance->previous_link_state); - } + if (link_instance->on_link_state_changed != NULL) + { + link_instance->on_link_state_changed(link_instance->callback_context, link_state, link_instance->previous_link_state); + } } static void remove_all_pending_deliveries(LINK_INSTANCE* link, bool indicate_settled) @@ -96,118 +96,118 @@ static int send_flow(LINK_INSTANCE* link) { - int result; - FLOW_HANDLE flow = flow_create(0, 0, 0); + int result; + FLOW_HANDLE flow = flow_create(0, 0, 0); - if (flow == NULL) - { - result = __FAILURE__; - } - else - { - if ((flow_set_link_credit(flow, link->link_credit) != 0) || - (flow_set_handle(flow, link->handle) != 0) || - (flow_set_delivery_count(flow, link->delivery_count) != 0)) - { - result = __FAILURE__; - } - else - { - if (session_send_flow(link->link_endpoint, flow) != 0) - { - result = __FAILURE__; - } - else - { - result = 0; - } - } + if (flow == NULL) + { + result = __FAILURE__; + } + else + { + if ((flow_set_link_credit(flow, link->link_credit) != 0) || + (flow_set_handle(flow, link->handle) != 0) || + (flow_set_delivery_count(flow, link->delivery_count) != 0)) + { + result = __FAILURE__; + } + else + { + if (session_send_flow(link->link_endpoint, flow) != 0) + { + result = __FAILURE__; + } + else + { + result = 0; + } + } - flow_destroy(flow); - } + flow_destroy(flow); + } - return result; + return result; } static int send_disposition(LINK_INSTANCE* link_instance, delivery_number delivery_number, AMQP_VALUE delivery_state) { - int result; + int result; - DISPOSITION_HANDLE disposition = disposition_create(link_instance->role, delivery_number); - if (disposition == NULL) - { - result = __FAILURE__; - } - else - { - if ((disposition_set_last(disposition, delivery_number) != 0) || - (disposition_set_settled(disposition, true) != 0) || - ((delivery_state != NULL) && (disposition_set_state(disposition, delivery_state) != 0))) - { - result = __FAILURE__; - } - else - { - if (session_send_disposition(link_instance->link_endpoint, disposition) != 0) - { - result = __FAILURE__; - } - else - { - result = 0; - } - } + DISPOSITION_HANDLE disposition = disposition_create(link_instance->role, delivery_number); + if (disposition == NULL) + { + result = __FAILURE__; + } + else + { + if ((disposition_set_last(disposition, delivery_number) != 0) || + (disposition_set_settled(disposition, true) != 0) || + ((delivery_state != NULL) && (disposition_set_state(disposition, delivery_state) != 0))) + { + result = __FAILURE__; + } + else + { + if (session_send_disposition(link_instance->link_endpoint, disposition) != 0) + { + result = __FAILURE__; + } + else + { + result = 0; + } + } - disposition_destroy(disposition); - } + disposition_destroy(disposition); + } - return result; + return result; } static int send_detach(LINK_INSTANCE* link_instance, bool close, ERROR_HANDLE error_handle) { - int result; - DETACH_HANDLE detach_performative; + int result; + DETACH_HANDLE detach_performative; - detach_performative = detach_create(0); - if (detach_performative == NULL) - { - result = __FAILURE__; - } - else - { - if ((error_handle != NULL) && - (detach_set_error(detach_performative, error_handle) != 0)) - { - result = __FAILURE__; - } + detach_performative = detach_create(0); + if (detach_performative == NULL) + { + result = __FAILURE__; + } + else + { + if ((error_handle != NULL) && + (detach_set_error(detach_performative, error_handle) != 0)) + { + result = __FAILURE__; + } else if (close && (detach_set_closed(detach_performative, true) != 0)) { result = __FAILURE__; } else - { - if (session_send_detach(link_instance->link_endpoint, detach_performative) != 0) - { - result = __FAILURE__; - } - else - { + { + if (session_send_detach(link_instance->link_endpoint, detach_performative) != 0) + { + result = __FAILURE__; + } + else + { if (close) { /* Declare link to be closed */ link_instance->is_closed = true; } - result = 0; - } - } + result = 0; + } + } - detach_destroy(detach_performative); - } + detach_destroy(detach_performative); + } - return result; + return result; } static int send_attach(LINK_INSTANCE* link, const char* name, handle handle, role role) @@ -264,107 +264,107 @@ static void link_frame_received(void* context, AMQP_VALUE performative, uint32_t payload_size, const unsigned char* payload_bytes) { - LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; - AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative); + LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; + AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative); - if (is_attach_type_by_descriptor(descriptor)) - { - ATTACH_HANDLE attach_handle; + if (is_attach_type_by_descriptor(descriptor)) + { + ATTACH_HANDLE attach_handle; - if (amqpvalue_get_attach(performative, &attach_handle) == 0) - { - if ((link_instance->role == role_receiver) && - (attach_get_initial_delivery_count(attach_handle, &link_instance->delivery_count) != 0)) - { - /* error */ - set_link_state(link_instance, LINK_STATE_DETACHED); - } - else - { - if (attach_get_max_message_size(attach_handle, &link_instance->peer_max_message_size) != 0) - { - LogError("Could not retrieve peer_max_message_size from attach frame"); - } + if (amqpvalue_get_attach(performative, &attach_handle) == 0) + { + if ((link_instance->role == role_receiver) && + (attach_get_initial_delivery_count(attach_handle, &link_instance->delivery_count) != 0)) + { + /* error */ + set_link_state(link_instance, LINK_STATE_DETACHED); + } + else + { + if (attach_get_max_message_size(attach_handle, &link_instance->peer_max_message_size) != 0) + { + LogError("Could not retrieve peer_max_message_size from attach frame"); + } - if ((link_instance->link_state == LINK_STATE_DETACHED) || - (link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_SENT)) - { - if (link_instance->role == role_receiver) - { - link_instance->link_credit = DEFAULT_LINK_CREDIT; - send_flow(link_instance); - } - else - { - link_instance->link_credit = 0; - } + if ((link_instance->link_state == LINK_STATE_DETACHED) || + (link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_SENT)) + { + if (link_instance->role == role_receiver) + { + link_instance->link_credit = DEFAULT_LINK_CREDIT; + send_flow(link_instance); + } + else + { + link_instance->link_credit = 0; + } - if (link_instance->link_state == LINK_STATE_DETACHED) - { - set_link_state(link_instance, LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED); - } - else - { - set_link_state(link_instance, LINK_STATE_ATTACHED); - } - } - } + if (link_instance->link_state == LINK_STATE_DETACHED) + { + set_link_state(link_instance, LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED); + } + else + { + set_link_state(link_instance, LINK_STATE_ATTACHED); + } + } + } - attach_destroy(attach_handle); - } - } - else if (is_flow_type_by_descriptor(descriptor)) - { - FLOW_HANDLE flow_handle; - if (amqpvalue_get_flow(performative, &flow_handle) == 0) - { - if (link_instance->role == role_sender) - { - delivery_number rcv_delivery_count; - uint32_t rcv_link_credit; + attach_destroy(attach_handle); + } + } + else if (is_flow_type_by_descriptor(descriptor)) + { + FLOW_HANDLE flow_handle; + if (amqpvalue_get_flow(performative, &flow_handle) == 0) + { + if (link_instance->role == role_sender) + { + delivery_number rcv_delivery_count; + uint32_t rcv_link_credit; - if ((flow_get_link_credit(flow_handle, &rcv_link_credit) != 0) || - (flow_get_delivery_count(flow_handle, &rcv_delivery_count) != 0)) - { - /* error */ - set_link_state(link_instance, LINK_STATE_DETACHED); - } - else - { - link_instance->link_credit = rcv_delivery_count + rcv_link_credit - link_instance->delivery_count; - if (link_instance->link_credit > 0) - { - link_instance->on_link_flow_on(link_instance->callback_context); - } - } - } - } + if ((flow_get_link_credit(flow_handle, &rcv_link_credit) != 0) || + (flow_get_delivery_count(flow_handle, &rcv_delivery_count) != 0)) + { + /* error */ + set_link_state(link_instance, LINK_STATE_DETACHED); + } + else + { + link_instance->link_credit = rcv_delivery_count + rcv_link_credit - link_instance->delivery_count; + if (link_instance->link_credit > 0) + { + link_instance->on_link_flow_on(link_instance->callback_context); + } + } + } + } - flow_destroy(flow_handle); - } - else if (is_transfer_type_by_descriptor(descriptor)) - { - if (link_instance->on_transfer_received != NULL) - { - TRANSFER_HANDLE transfer_handle; - if (amqpvalue_get_transfer(performative, &transfer_handle) == 0) - { - AMQP_VALUE delivery_state; + flow_destroy(flow_handle); + } + else if (is_transfer_type_by_descriptor(descriptor)) + { + if (link_instance->on_transfer_received != NULL) + { + TRANSFER_HANDLE transfer_handle; + if (amqpvalue_get_transfer(performative, &transfer_handle) == 0) + { + AMQP_VALUE delivery_state; bool more; - bool is_error; + bool is_error; - link_instance->link_credit--; - link_instance->delivery_count++; - if (link_instance->link_credit == 0) - { - link_instance->link_credit = DEFAULT_LINK_CREDIT; - send_flow(link_instance); - } + link_instance->link_credit--; + link_instance->delivery_count++; + if (link_instance->link_credit == 0) + { + link_instance->link_credit = DEFAULT_LINK_CREDIT; + send_flow(link_instance); + } - more = false; - /* Attempt to get more flag, default to false */ - (void)transfer_get_more(transfer_handle, &more); - is_error = false; + more = false; + /* Attempt to get more flag, default to false */ + (void)transfer_get_more(transfer_handle, &more); + is_error = false; if (transfer_get_delivery_id(transfer_handle, &link_instance->received_delivery_id) != 0) { @@ -431,34 +431,34 @@ } } - transfer_destroy(transfer_handle); - } - } - } - else if (is_disposition_type_by_descriptor(descriptor)) - { - DISPOSITION_HANDLE disposition; - if (amqpvalue_get_disposition(performative, &disposition) != 0) - { - /* error */ - } - else - { - delivery_number first; - delivery_number last; + transfer_destroy(transfer_handle); + } + } + } + else if (is_disposition_type_by_descriptor(descriptor)) + { + DISPOSITION_HANDLE disposition; + if (amqpvalue_get_disposition(performative, &disposition) != 0) + { + /* error */ + } + else + { + delivery_number first; + delivery_number last; - if (disposition_get_first(disposition, &first) != 0) - { - /* error */ - } - else - { + if (disposition_get_first(disposition, &first) != 0) + { + /* error */ + } + else + { bool settled; - if (disposition_get_last(disposition, &last) != 0) - { - last = first; - } + if (disposition_get_last(disposition, &last) != 0) + { + last = first; + } if (disposition_get_settled(disposition, &settled) != 0) { @@ -509,13 +509,13 @@ } } } - } + } - disposition_destroy(disposition); - } - } - else if (is_detach_type_by_descriptor(descriptor)) - { + disposition_destroy(disposition); + } + } + else if (is_detach_type_by_descriptor(descriptor)) + { DETACH_HANDLE detach; /* Set link state appropriately based on whether we received detach condition */ @@ -562,198 +562,198 @@ static void on_session_state_changed(void* context, SESSION_STATE new_session_state, SESSION_STATE previous_session_state) { - LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; + LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; (void)previous_session_state; - if (new_session_state == SESSION_STATE_MAPPED) - { - if ((link_instance->link_state == LINK_STATE_DETACHED) && (!link_instance->is_closed)) - { - if (send_attach(link_instance, link_instance->name, 0, link_instance->role) == 0) - { - set_link_state(link_instance, LINK_STATE_HALF_ATTACHED_ATTACH_SENT); - } - } - } - else if (new_session_state == SESSION_STATE_DISCARDING) - { + if (new_session_state == SESSION_STATE_MAPPED) + { + if ((link_instance->link_state == LINK_STATE_DETACHED) && (!link_instance->is_closed)) + { + if (send_attach(link_instance, link_instance->name, 0, link_instance->role) == 0) + { + set_link_state(link_instance, LINK_STATE_HALF_ATTACHED_ATTACH_SENT); + } + } + } + else if (new_session_state == SESSION_STATE_DISCARDING) + { set_link_state(link_instance, LINK_STATE_DETACHED); remove_all_pending_deliveries(link_instance, true); - } - else if (new_session_state == SESSION_STATE_ERROR) - { + } + else if (new_session_state == SESSION_STATE_ERROR) + { set_link_state(link_instance, LINK_STATE_ERROR); remove_all_pending_deliveries(link_instance, true); - } + } } static void on_session_flow_on(void* context) { - LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; - if (link_instance->role == role_sender) - { - link_instance->on_link_flow_on(link_instance->callback_context); - } + LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context; + if (link_instance->role == role_sender) + { + link_instance->on_link_flow_on(link_instance->callback_context); + } } static void on_send_complete(void* context, IO_SEND_RESULT send_result) { - LIST_ITEM_HANDLE delivery_instance_list_item = (LIST_ITEM_HANDLE)context; - DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(delivery_instance_list_item); - LINK_INSTANCE* link_instance = (LINK_INSTANCE*)delivery_instance->link; + LIST_ITEM_HANDLE delivery_instance_list_item = (LIST_ITEM_HANDLE)context; + DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(delivery_instance_list_item); + LINK_INSTANCE* link_instance = (LINK_INSTANCE*)delivery_instance->link; (void)send_result; - if (link_instance->snd_settle_mode == sender_settle_mode_settled) - { - delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_SETTLED, NULL); - free(delivery_instance); - (void)singlylinkedlist_remove(link_instance->pending_deliveries, delivery_instance_list_item); - } + if (link_instance->snd_settle_mode == sender_settle_mode_settled) + { + delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_SETTLED, NULL); + free(delivery_instance); + (void)singlylinkedlist_remove(link_instance->pending_deliveries, delivery_instance_list_item); + } } LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target) { - LINK_INSTANCE* result = (LINK_INSTANCE*)malloc(sizeof(LINK_INSTANCE)); - if (result != NULL) - { - result->link_state = LINK_STATE_DETACHED; - result->previous_link_state = LINK_STATE_DETACHED; - result->role = role; - result->source = amqpvalue_clone(source); - result->target = amqpvalue_clone(target); - result->session = session; - result->handle = 0; - result->snd_settle_mode = sender_settle_mode_unsettled; - result->rcv_settle_mode = receiver_settle_mode_first; - result->delivery_count = 0; - result->initial_delivery_count = 0; - result->max_message_size = 0; - result->peer_max_message_size = 0; - result->is_underlying_session_begun = false; + LINK_INSTANCE* result = (LINK_INSTANCE*)malloc(sizeof(LINK_INSTANCE)); + if (result != NULL) + { + result->link_state = LINK_STATE_DETACHED; + result->previous_link_state = LINK_STATE_DETACHED; + result->role = role; + result->source = amqpvalue_clone(source); + result->target = amqpvalue_clone(target); + result->session = session; + result->handle = 0; + result->snd_settle_mode = sender_settle_mode_unsettled; + result->rcv_settle_mode = receiver_settle_mode_first; + result->delivery_count = 0; + result->initial_delivery_count = 0; + result->max_message_size = 0; + result->peer_max_message_size = 0; + result->is_underlying_session_begun = false; result->is_closed = false; result->attach_properties = NULL; result->received_payload = NULL; result->received_payload_size = 0; result->received_delivery_id = 0; - result->pending_deliveries = singlylinkedlist_create(); - if (result->pending_deliveries == NULL) - { - free(result); - result = NULL; - } - else - { + result->pending_deliveries = singlylinkedlist_create(); + if (result->pending_deliveries == NULL) + { + free(result); + result = NULL; + } + else + { size_t name_length = strlen(name); - result->name = (char*)malloc(name_length + 1); - if (result->name == NULL) - { - singlylinkedlist_destroy(result->pending_deliveries); - free(result); - result = NULL; - } - else - { - result->on_link_state_changed = NULL; - result->callback_context = NULL; - set_link_state(result, LINK_STATE_DETACHED); + result->name = (char*)malloc(name_length + 1); + if (result->name == NULL) + { + singlylinkedlist_destroy(result->pending_deliveries); + free(result); + result = NULL; + } + else + { + result->on_link_state_changed = NULL; + result->callback_context = NULL; + set_link_state(result, LINK_STATE_DETACHED); - (void)memcpy(result->name, name, name_length + 1); - result->link_endpoint = session_create_link_endpoint(session, name); - if (result->link_endpoint == NULL) - { - singlylinkedlist_destroy(result->pending_deliveries); - free(result->name); - free(result); - result = NULL; - } - } - } - } + (void)memcpy(result->name, name, name_length + 1); + result->link_endpoint = session_create_link_endpoint(session, name); + if (result->link_endpoint == NULL) + { + singlylinkedlist_destroy(result->pending_deliveries); + free(result->name); + free(result); + result = NULL; + } + } + } + } - return result; + return result; } LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HANDLE link_endpoint, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target) { - LINK_INSTANCE* result = (LINK_INSTANCE*)malloc(sizeof(LINK_INSTANCE)); - if (result != NULL) - { - result->link_state = LINK_STATE_DETACHED; - result->previous_link_state = LINK_STATE_DETACHED; - result->session = session; - result->handle = 0; - result->snd_settle_mode = sender_settle_mode_unsettled; - result->rcv_settle_mode = receiver_settle_mode_first; - result->delivery_count = 0; - result->initial_delivery_count = 0; - result->max_message_size = 0; - result->peer_max_message_size = 0; - result->is_underlying_session_begun = false; + LINK_INSTANCE* result = (LINK_INSTANCE*)malloc(sizeof(LINK_INSTANCE)); + if (result != NULL) + { + result->link_state = LINK_STATE_DETACHED; + result->previous_link_state = LINK_STATE_DETACHED; + result->session = session; + result->handle = 0; + result->snd_settle_mode = sender_settle_mode_unsettled; + result->rcv_settle_mode = receiver_settle_mode_first; + result->delivery_count = 0; + result->initial_delivery_count = 0; + result->max_message_size = 0; + result->peer_max_message_size = 0; + result->is_underlying_session_begun = false; result->is_closed = false; result->attach_properties = NULL; result->received_payload = NULL; result->received_payload_size = 0; result->received_delivery_id = 0; result->source = amqpvalue_clone(target); - result->target = amqpvalue_clone(source); - if (role == role_sender) - { - result->role = role_receiver; - } - else - { - result->role = role_sender; - } + result->target = amqpvalue_clone(source); + if (role == role_sender) + { + result->role = role_receiver; + } + else + { + result->role = role_sender; + } - result->pending_deliveries = singlylinkedlist_create(); - if (result->pending_deliveries == NULL) - { - free(result); - result = NULL; - } - else - { + result->pending_deliveries = singlylinkedlist_create(); + if (result->pending_deliveries == NULL) + { + free(result); + result = NULL; + } + else + { size_t name_length = strlen(name); - result->name = (char*)malloc(name_length + 1); - if (result->name == NULL) - { - singlylinkedlist_destroy(result->pending_deliveries); - free(result); - result = NULL; - } - else - { - (void)memcpy(result->name, name, name_length + 1); - result->on_link_state_changed = NULL; - result->callback_context = NULL; - result->link_endpoint = link_endpoint; - } - } - } + result->name = (char*)malloc(name_length + 1); + if (result->name == NULL) + { + singlylinkedlist_destroy(result->pending_deliveries); + free(result); + result = NULL; + } + else + { + (void)memcpy(result->name, name, name_length + 1); + result->on_link_state_changed = NULL; + result->callback_context = NULL; + result->link_endpoint = link_endpoint; + } + } + } - return result; + return result; } void link_destroy(LINK_HANDLE link) { - if (link != NULL) - { + if (link != NULL) + { remove_all_pending_deliveries((LINK_INSTANCE*)link, false); link->on_link_state_changed = NULL; (void)link_detach(link, true); session_destroy_link_endpoint(link->link_endpoint); - amqpvalue_destroy(link->source); - amqpvalue_destroy(link->target); + amqpvalue_destroy(link->source); + amqpvalue_destroy(link->target); - if (link->name != NULL) - { - free(link->name); - } + if (link->name != NULL) + { + free(link->name); + } - if (link->attach_properties != NULL) + if (link->attach_properties != NULL) { - amqpvalue_destroy(link->attach_properties); + amqpvalue_destroy(link->attach_properties); } if (link->received_payload != NULL) @@ -761,175 +761,175 @@ free(link->received_payload); } - free(link); - } + free(link); + } } int link_set_snd_settle_mode(LINK_HANDLE link, sender_settle_mode snd_settle_mode) { - int result; + int result; - if (link == NULL) - { - result = __FAILURE__; - } - else - { - link->snd_settle_mode = snd_settle_mode; - result = 0; - } + if (link == NULL) + { + result = __FAILURE__; + } + else + { + link->snd_settle_mode = snd_settle_mode; + result = 0; + } - return result; + return result; } int link_get_snd_settle_mode(LINK_HANDLE link, sender_settle_mode* snd_settle_mode) { - int result; + int result; - if ((link == NULL) || - (snd_settle_mode == NULL)) - { - result = __FAILURE__; - } - else - { - *snd_settle_mode = link->snd_settle_mode; + if ((link == NULL) || + (snd_settle_mode == NULL)) + { + result = __FAILURE__; + } + else + { + *snd_settle_mode = link->snd_settle_mode; - result = 0; - } + result = 0; + } - return result; + return result; } int link_set_rcv_settle_mode(LINK_HANDLE link, receiver_settle_mode rcv_settle_mode) { - int result; + int result; - if (link == NULL) - { - result = __FAILURE__; - } - else - { - link->rcv_settle_mode = rcv_settle_mode; - result = 0; - } + if (link == NULL) + { + result = __FAILURE__; + } + else + { + link->rcv_settle_mode = rcv_settle_mode; + result = 0; + } - return result; + return result; } int link_get_rcv_settle_mode(LINK_HANDLE link, receiver_settle_mode* rcv_settle_mode) { - int result; + int result; - if ((link == NULL) || - (rcv_settle_mode == NULL)) - { - result = __FAILURE__; - } - else - { - *rcv_settle_mode = link->rcv_settle_mode; - result = 0; - } + if ((link == NULL) || + (rcv_settle_mode == NULL)) + { + result = __FAILURE__; + } + else + { + *rcv_settle_mode = link->rcv_settle_mode; + result = 0; + } - return result; + return result; } int link_set_initial_delivery_count(LINK_HANDLE link, sequence_no initial_delivery_count) { - int result; + int result; - if (link == NULL) - { - result = __FAILURE__; - } - else - { - link->initial_delivery_count = initial_delivery_count; - result = 0; - } + if (link == NULL) + { + result = __FAILURE__; + } + else + { + link->initial_delivery_count = initial_delivery_count; + result = 0; + } - return result; + return result; } int link_get_initial_delivery_count(LINK_HANDLE link, sequence_no* initial_delivery_count) { - int result; + int result; - if ((link == NULL) || - (initial_delivery_count == NULL)) - { - result = __FAILURE__; - } - else - { - *initial_delivery_count = link->initial_delivery_count; - result = 0; - } + if ((link == NULL) || + (initial_delivery_count == NULL)) + { + result = __FAILURE__; + } + else + { + *initial_delivery_count = link->initial_delivery_count; + result = 0; + } - return result; + return result; } int link_set_max_message_size(LINK_HANDLE link, uint64_t max_message_size) { - int result; + int result; - if (link == NULL) - { - result = __FAILURE__; - } - else - { - link->max_message_size = max_message_size; - result = 0; - } + if (link == NULL) + { + result = __FAILURE__; + } + else + { + link->max_message_size = max_message_size; + result = 0; + } - return result; + return result; } int link_get_max_message_size(LINK_HANDLE link, uint64_t* max_message_size) { - int result; + int result; - if ((link == NULL) || - (max_message_size == NULL)) - { - result = __FAILURE__; - } - else - { - *max_message_size = link->max_message_size; - result = 0; - } + if ((link == NULL) || + (max_message_size == NULL)) + { + result = __FAILURE__; + } + else + { + *max_message_size = link->max_message_size; + result = 0; + } - return result; + return result; } int link_get_peer_max_message_size(LINK_HANDLE link, uint64_t* peer_max_message_size) { - int result; + int result; - if ((link == NULL) || - (peer_max_message_size == NULL)) - { - LogError("Bad arguments: link = %p, peer_max_message_size = %p", - link, peer_max_message_size); - result = __FAILURE__; - } - else if ((link->link_state != LINK_STATE_ATTACHED) && - (link->link_state != LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED)) - { - LogError("Attempting to read peer max message size before it was received"); - result = __FAILURE__; - } - else - { - *peer_max_message_size = link->peer_max_message_size; - result = 0; - } + if ((link == NULL) || + (peer_max_message_size == NULL)) + { + LogError("Bad arguments: link = %p, peer_max_message_size = %p", + link, peer_max_message_size); + result = __FAILURE__; + } + else if ((link->link_state != LINK_STATE_ATTACHED) && + (link->link_state != LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED)) + { + LogError("Attempting to read peer max message size before it was received"); + result = __FAILURE__; + } + else + { + *peer_max_message_size = link->peer_max_message_size; + result = 0; + } - return result; + return result; } int link_set_attach_properties(LINK_HANDLE link, fields attach_properties) @@ -958,66 +958,66 @@ int link_attach(LINK_HANDLE link, ON_TRANSFER_RECEIVED on_transfer_received, ON_LINK_STATE_CHANGED on_link_state_changed, ON_LINK_FLOW_ON on_link_flow_on, void* callback_context) { - int result; + int result; - if ((link == NULL) || + if ((link == NULL) || (link->is_closed)) - { - result = __FAILURE__; - } - else - { - if (!link->is_underlying_session_begun) - { - link->on_link_state_changed = on_link_state_changed; - link->on_transfer_received = on_transfer_received; - link->on_link_flow_on = on_link_flow_on; - link->callback_context = callback_context; + { + result = __FAILURE__; + } + else + { + if (!link->is_underlying_session_begun) + { + link->on_link_state_changed = on_link_state_changed; + link->on_transfer_received = on_transfer_received; + link->on_link_flow_on = on_link_flow_on; + link->callback_context = callback_context; - if (session_begin(link->session) != 0) - { - result = __FAILURE__; - } - else - { - link->is_underlying_session_begun = true; + if (session_begin(link->session) != 0) + { + result = __FAILURE__; + } + else + { + link->is_underlying_session_begun = true; - if (session_start_link_endpoint(link->link_endpoint, link_frame_received, on_session_state_changed, on_session_flow_on, link) != 0) - { - result = __FAILURE__; - } - else - { + if (session_start_link_endpoint(link->link_endpoint, link_frame_received, on_session_state_changed, on_session_flow_on, link) != 0) + { + result = __FAILURE__; + } + else + { link->received_payload_size = 0; - result = 0; - } - } - } - else - { - result = 0; - } - } + result = 0; + } + } + } + else + { + result = 0; + } + } - return result; + return result; } int link_detach(LINK_HANDLE link, bool close) { - int result; + int result; if ((link == NULL) || (link->is_closed)) { - result = __FAILURE__; - } - else - { + result = __FAILURE__; + } + else + { switch (link->link_state) { case LINK_STATE_HALF_ATTACHED_ATTACH_SENT: - case LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED: + case LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED: /* Sending detach when remote is not yet attached */ if (send_detach(link, close, NULL) != 0) { @@ -1054,130 +1054,130 @@ result = __FAILURE__; break; } - } + } - return result; + return result; } LINK_TRANSFER_RESULT link_transfer(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context) { - LINK_TRANSFER_RESULT result; + LINK_TRANSFER_RESULT result; - if (link == NULL) - { - result = LINK_TRANSFER_ERROR; - } - else - { - if ((link->role != role_sender) || - (link->link_state != LINK_STATE_ATTACHED)) - { - result = LINK_TRANSFER_ERROR; - } - else if (link->link_credit == 0) - { - result = LINK_TRANSFER_BUSY; - } - else - { - TRANSFER_HANDLE transfer = transfer_create(0); - if (transfer == NULL) - { - result = LINK_TRANSFER_ERROR; - } - else - { + if (link == NULL) + { + result = LINK_TRANSFER_ERROR; + } + else + { + if ((link->role != role_sender) || + (link->link_state != LINK_STATE_ATTACHED)) + { + result = LINK_TRANSFER_ERROR; + } + else if (link->link_credit == 0) + { + result = LINK_TRANSFER_BUSY; + } + else + { + TRANSFER_HANDLE transfer = transfer_create(0); + if (transfer == NULL) + { + result = LINK_TRANSFER_ERROR; + } + else + { sequence_no delivery_count = link->delivery_count + 1; unsigned char delivery_tag_bytes[sizeof(delivery_count)]; - delivery_tag delivery_tag; - bool settled; + delivery_tag delivery_tag; + bool settled; - (void)memcpy(delivery_tag_bytes, &delivery_count, sizeof(delivery_count)); + (void)memcpy(delivery_tag_bytes, &delivery_count, sizeof(delivery_count)); - delivery_tag.bytes = &delivery_tag_bytes; - delivery_tag.length = sizeof(delivery_tag_bytes); + delivery_tag.bytes = &delivery_tag_bytes; + delivery_tag.length = sizeof(delivery_tag_bytes); - if (link->snd_settle_mode == sender_settle_mode_unsettled) - { - settled = false; - } - else - { - settled = true; - } + if (link->snd_settle_mode == sender_settle_mode_unsettled) + { + settled = false; + } + else + { + settled = true; + } - if ((transfer_set_delivery_tag(transfer, delivery_tag) != 0) || - (transfer_set_message_format(transfer, message_format) != 0) || - (transfer_set_settled(transfer, settled) != 0)) - { - result = LINK_TRANSFER_ERROR; - } - else - { - AMQP_VALUE transfer_value = amqpvalue_create_transfer(transfer); + if ((transfer_set_delivery_tag(transfer, delivery_tag) != 0) || + (transfer_set_message_format(transfer, message_format) != 0) || + (transfer_set_settled(transfer, settled) != 0)) + { + result = LINK_TRANSFER_ERROR; + } + else + { + AMQP_VALUE transfer_value = amqpvalue_create_transfer(transfer); - if (transfer_value == NULL) - { - result = LINK_TRANSFER_ERROR; - } - else - { - DELIVERY_INSTANCE* pending_delivery = (DELIVERY_INSTANCE*)malloc(sizeof(DELIVERY_INSTANCE)); - if (pending_delivery == NULL) - { - result = LINK_TRANSFER_ERROR; - } - else - { - LIST_ITEM_HANDLE delivery_instance_list_item; - pending_delivery->on_delivery_settled = on_delivery_settled; - pending_delivery->callback_context = callback_context; - pending_delivery->link = link; - delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery); + if (transfer_value == NULL) + { + result = LINK_TRANSFER_ERROR; + } + else + { + DELIVERY_INSTANCE* pending_delivery = (DELIVERY_INSTANCE*)malloc(sizeof(DELIVERY_INSTANCE)); + if (pending_delivery == NULL) + { + result = LINK_TRANSFER_ERROR; + } + else + { + LIST_ITEM_HANDLE delivery_instance_list_item; + pending_delivery->on_delivery_settled = on_delivery_settled; + pending_delivery->callback_context = callback_context; + pending_delivery->link = link; + delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery); - if (delivery_instance_list_item == NULL) - { - free(pending_delivery); - result = LINK_TRANSFER_ERROR; - } - else - { - /* here we should feed data to the transfer frame */ - switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item)) - { - default: - case SESSION_SEND_TRANSFER_ERROR: - singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); - free(pending_delivery); - result = LINK_TRANSFER_ERROR; - break; + if (delivery_instance_list_item == NULL) + { + free(pending_delivery); + result = LINK_TRANSFER_ERROR; + } + else + { + /* here we should feed data to the transfer frame */ + switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item)) + { + default: + case SESSION_SEND_TRANSFER_ERROR: + singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); + free(pending_delivery); + result = LINK_TRANSFER_ERROR; + break; - case SESSION_SEND_TRANSFER_BUSY: - /* Ensure we remove from list again since sender will attempt to transfer again on flow on */ - singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); - free(pending_delivery); - result = LINK_TRANSFER_BUSY; - break; + case SESSION_SEND_TRANSFER_BUSY: + /* Ensure we remove from list again since sender will attempt to transfer again on flow on */ + singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item); + free(pending_delivery); + result = LINK_TRANSFER_BUSY; + break; - case SESSION_SEND_TRANSFER_OK: - link->delivery_count = delivery_count; - link->link_credit--; - result = LINK_TRANSFER_OK; - break; - } - } - } + case SESSION_SEND_TRANSFER_OK: + link->delivery_count = delivery_count; + link->link_credit--; + result = LINK_TRANSFER_OK; + break; + } + } + } - amqpvalue_destroy(transfer_value); - } - } + amqpvalue_destroy(transfer_value); + } + } - transfer_destroy(transfer); - } - } - } + transfer_destroy(transfer); + } + } + } - return result; + return result; } int link_get_name(LINK_HANDLE link, const char** link_name) @@ -1218,16 +1218,16 @@ { int result; if (delivery_state == NULL) - { - result = 0; - } - else { - result = send_disposition(link, message_id, delivery_state); + result = 0; + } + else + { + result = send_disposition(link, message_id, delivery_state); if ( result != 0) { LogError("Cannot send disposition frame"); - result = __FAILURE__; + result = __FAILURE__; } } return result;