A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: link.c
- Revision:
- 13:9abd748f4e78
- Parent:
- 12:b30dacf113f2
- Child:
- 17:923575db8b2d
--- a/link.c Thu Oct 20 17:07:44 2016 -0700 +++ b/link.c Wed Nov 16 21:38:05 2016 -0800 @@ -309,6 +309,7 @@ { AMQP_VALUE delivery_state; bool more; + bool is_error; link_instance->link_credit--; link_instance->delivery_count++; @@ -318,77 +319,73 @@ send_flow(link_instance); } - if (transfer_get_more(transfer_handle, &more) != 0) - { - LogError("Could not get the more field from the transfer performative"); - } - else - { - bool is_error = false; + more = false; + /* Attempt to get more flag, default to false */ + (void)transfer_get_more(transfer_handle, &more); + is_error = false; - if (transfer_get_delivery_id(transfer_handle, &link_instance->received_delivery_id) != 0) + if (transfer_get_delivery_id(transfer_handle, &link_instance->received_delivery_id) != 0) + { + /* is this not a continuation transfer? */ + if (link_instance->received_payload_size == 0) { - /* is this not a continuation transfer? */ - if (link_instance->received_payload_size == 0) + LogError("Could not get the delivery Id from the transfer performative"); + is_error = true; + } + } + + if (!is_error) + { + /* If this is a continuation transfer or if this is the first chunk of a multi frame transfer */ + if ((link_instance->received_payload_size > 0) || more) + { + unsigned char* new_received_payload = (unsigned char*)realloc(link_instance->received_payload, link_instance->received_payload_size + payload_size); + if (new_received_payload == NULL) { - LogError("Could not get the delivery Id from the transfer performative"); - is_error = true; + LogError("Could not allocate memory for the received payload"); + } + else + { + link_instance->received_payload = new_received_payload; + (void)memcpy(link_instance->received_payload + link_instance->received_payload_size, payload_bytes, payload_size); + link_instance->received_payload_size += payload_size; } } - - if (!is_error) + + if (!more) { - /* If this is a continuation transfer or if this is the first chunk of a multi frame transfer */ - if ((link_instance->received_payload_size > 0) || more) + const unsigned char* indicate_payload_bytes; + uint32_t indicate_payload_size; + + /* if no previously stored chunks then simply report the current payload */ + if (link_instance->received_payload_size > 0) { - unsigned char* new_received_payload = (unsigned char*)realloc(link_instance->received_payload, link_instance->received_payload_size + payload_size); - if (new_received_payload == NULL) - { - LogError("Could not allocate memory for the received payload"); - } - else - { - link_instance->received_payload = new_received_payload; - (void)memcpy(link_instance->received_payload + link_instance->received_payload_size, payload_bytes, payload_size); - link_instance->received_payload_size += payload_size; - } + indicate_payload_size = link_instance->received_payload_size; + indicate_payload_bytes = link_instance->received_payload; + } + else + { + indicate_payload_size = payload_size; + indicate_payload_bytes = payload_bytes; } - if (!more) - { - const unsigned char* indicate_payload_bytes; - uint32_t indicate_payload_size; + delivery_state = link_instance->on_transfer_received(link_instance->callback_context, transfer_handle, indicate_payload_size, indicate_payload_bytes); - /* if no previously stored chunks then simply report the current payload */ - if (link_instance->received_payload_size > 0) - { - indicate_payload_size = link_instance->received_payload_size; - indicate_payload_bytes = link_instance->received_payload; - } - else - { - indicate_payload_size = payload_size; - indicate_payload_bytes = payload_bytes; - } + if (link_instance->received_payload_size > 0) + { + free(link_instance->received_payload); + link_instance->received_payload = NULL; + link_instance->received_payload_size = 0; + } - delivery_state = link_instance->on_transfer_received(link_instance->callback_context, transfer_handle, indicate_payload_size, indicate_payload_bytes); - - if (link_instance->received_payload_size > 0) - { - free(link_instance->received_payload); - link_instance->received_payload = NULL; - link_instance->received_payload_size = 0; - } + if (send_disposition(link_instance, link_instance->received_delivery_id, delivery_state) != 0) + { + LogError("Cannot send disposition frame"); + } - if (send_disposition(link_instance, link_instance->received_delivery_id, delivery_state) != 0) - { - LogError("Cannot send disposition frame"); - } - - if (delivery_state != NULL) - { - amqpvalue_destroy(delivery_state); - } + if (delivery_state != NULL) + { + amqpvalue_destroy(delivery_state); } } }