Microsoft Azure IoTHub client libraries

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp f767zi_mqtt ... more

This library implements the Microsoft Azure IoTHub client library. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
66:a419827cb051
Parent:
57:4524910c6445
Child:
71:0d498da5ece1
--- a/iothubtransport.c	Fri Apr 21 14:50:19 2017 -0700
+++ b/iothubtransport.c	Mon May 08 10:50:31 2017 -0700
@@ -16,12 +16,14 @@
 
 typedef struct TRANSPORT_HANDLE_DATA_TAG
 {
-	TRANSPORT_LL_HANDLE transportLLHandle;
+    TRANSPORT_LL_HANDLE transportLLHandle;
     THREAD_HANDLE workerThreadHandle;
     LOCK_HANDLE lockHandle;
     sig_atomic_t stopThread;
-	TRANSPORT_PROVIDER_FIELDS;
-	VECTOR_HANDLE clients;
+    TRANSPORT_PROVIDER_FIELDS;
+    VECTOR_HANDLE clients;
+    LOCK_HANDLE clientsLockHandle;
+    IOTHUB_CLIENT_MULTIPLEXED_DO_WORK clientDoWork;
 } TRANSPORT_HANDLE_DATA;
 
 /* Used for Unit test */
@@ -29,342 +31,420 @@
 
 TRANSPORT_HANDLE  IoTHubTransport_Create(IOTHUB_CLIENT_TRANSPORT_PROVIDER protocol, const char* iotHubName, const char* iotHubSuffix)
 {
-	TRANSPORT_HANDLE_DATA *result;
+    TRANSPORT_HANDLE_DATA *result;
 
-	if (protocol == NULL || iotHubName == NULL || iotHubSuffix == NULL)
-	{
-		/*Codes_SRS_IOTHUBTRANSPORT_17_002: [ If protocol is NULL, this function shall return NULL. ]*/
-		/*Codes_SRS_IOTHUBTRANSPORT_17_003: [ If iotHubName is NULL, this function shall return NULL. ]*/
-		/*Codes_SRS_IOTHUBTRANSPORT_17_004: [ If iotHubSuffix is NULL, this function shall return NULL. ]*/
-		LogError("Invalid NULL argument, protocol [%p], name [%p], suffix [%p].", protocol, iotHubName, iotHubSuffix);
-		result = NULL;
-	}
-	else
-	{
-		/*Codes_SRS_IOTHUBTRANSPORT_17_032: [ IoTHubTransport_Create shall allocate memory for the transport data. ]*/
-		result = (TRANSPORT_HANDLE_DATA*)malloc(sizeof(TRANSPORT_HANDLE_DATA));
-		if (result == NULL)
-		{
-			/*Codes_SRS_IOTHUBTRANSPORT_17_040: [ If memory allocation fails, IoTHubTransport_Create shall return NULL. ]*/
-			LogError("Transport handle was not allocated.");
-		}
-		else
-		{
-			TRANSPORT_PROVIDER * transportProtocol = (TRANSPORT_PROVIDER*)(protocol());
-			IOTHUB_CLIENT_CONFIG upperConfig;
-			upperConfig.deviceId = NULL;
-			upperConfig.deviceKey = NULL;
-			upperConfig.iotHubName = iotHubName;
-			upperConfig.iotHubSuffix = iotHubSuffix;
-			upperConfig.protocol = protocol;
-			upperConfig.protocolGatewayHostName = NULL;
+    if (protocol == NULL || iotHubName == NULL || iotHubSuffix == NULL)
+    {
+        /*Codes_SRS_IOTHUBTRANSPORT_17_002: [ If protocol is NULL, this function shall return NULL. ]*/
+        /*Codes_SRS_IOTHUBTRANSPORT_17_003: [ If iotHubName is NULL, this function shall return NULL. ]*/
+        /*Codes_SRS_IOTHUBTRANSPORT_17_004: [ If iotHubSuffix is NULL, this function shall return NULL. ]*/
+        LogError("Invalid NULL argument, protocol [%p], name [%p], suffix [%p].", protocol, iotHubName, iotHubSuffix);
+        result = NULL;
+    }
+    else
+    {
+        /*Codes_SRS_IOTHUBTRANSPORT_17_032: [ IoTHubTransport_Create shall allocate memory for the transport data. ]*/
+        result = (TRANSPORT_HANDLE_DATA*)malloc(sizeof(TRANSPORT_HANDLE_DATA));
+        if (result == NULL)
+        {
+            /*Codes_SRS_IOTHUBTRANSPORT_17_040: [ If memory allocation fails, IoTHubTransport_Create shall return NULL. ]*/
+            LogError("Transport handle was not allocated.");
+        }
+        else
+        {
+            TRANSPORT_PROVIDER * transportProtocol = (TRANSPORT_PROVIDER*)(protocol());
+            IOTHUB_CLIENT_CONFIG upperConfig;
+            upperConfig.deviceId = NULL;
+            upperConfig.deviceKey = NULL;
+            upperConfig.iotHubName = iotHubName;
+            upperConfig.iotHubSuffix = iotHubSuffix;
+            upperConfig.protocol = protocol;
+            upperConfig.protocolGatewayHostName = NULL;
 
-			IOTHUBTRANSPORT_CONFIG transportLLConfig;
-			transportLLConfig.upperConfig = &upperConfig;
-			transportLLConfig.waitingToSend = NULL;
+            IOTHUBTRANSPORT_CONFIG transportLLConfig;
+            memset(&transportLLConfig, 0, sizeof(IOTHUBTRANSPORT_CONFIG));
+            transportLLConfig.upperConfig = &upperConfig;
+            transportLLConfig.waitingToSend = NULL;
 
-			/*Codes_SRS_IOTHUBTRANSPORT_17_005: [ IoTHubTransport_Create shall create the lower layer transport by calling the protocol's IoTHubTransport_Create function. ]*/
-			result->transportLLHandle = transportProtocol->IoTHubTransport_Create(&transportLLConfig);
-			if (result->transportLLHandle == NULL)
-			{
-				/*Codes_SRS_IOTHUBTRANSPORT_17_006: [ If the creation of the transport fails, IoTHubTransport_Create shall return NULL. ]*/
-				LogError("Lower Layer transport not created.");
-				free(result);
-				result = NULL;
-			}
-			else
-			{
-				/*Codes_SRS_IOTHUBTRANSPORT_17_007: [ IoTHubTransport_Create shall create the transport lock by Calling Lock_Init. ]*/
-				result->lockHandle = Lock_Init();
-				if (result->lockHandle == NULL)
-				{
-					/*Codes_SRS_IOTHUBTRANSPORT_17_008: [ If the lock creation fails, IoTHubTransport_Create shall return NULL. ]*/
-					LogError("transport Lock not created.");
-					transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle);
-					free(result);
-					result = NULL;
-				}
-				else
-				{
-					/*Codes_SRS_IOTHUBTRANSPORT_17_038: [ IoTHubTransport_Create shall call VECTOR_Create to make a list of IOTHUB_CLIENT_HANDLE using this transport. ]*/
-					result->clients = VECTOR_create(sizeof(IOTHUB_CLIENT_HANDLE));
-					if (result->clients == NULL)
-					{
-						/*Codes_SRS_IOTHUBTRANSPORT_17_039: [ If the Vector creation fails, IoTHubTransport_Create shall return NULL. ]*/
-						/*Codes_SRS_IOTHUBTRANSPORT_17_009: [ IoTHubTransport_Create shall clean up any resources it creates if the function does not succeed. ]*/
-						LogError("clients list not created.");
-						Lock_Deinit(result->lockHandle);
-						transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle);
-						free(result);
-						result = NULL;
-					}
-					else
-					{
-						/*Codes_SRS_IOTHUBTRANSPORT_17_001: [ IoTHubTransport_Create shall return a non-NULL handle on success.]*/
-						result->stopThread = 1;
-						result->workerThreadHandle = NULL; /* create thread when work needs to be done */
+            /*Codes_SRS_IOTHUBTRANSPORT_17_005: [ IoTHubTransport_Create shall create the lower layer transport by calling the protocol's IoTHubTransport_Create function. ]*/
+            result->transportLLHandle = transportProtocol->IoTHubTransport_Create(&transportLLConfig);
+            if (result->transportLLHandle == NULL)
+            {
+                /*Codes_SRS_IOTHUBTRANSPORT_17_006: [ If the creation of the transport fails, IoTHubTransport_Create shall return NULL. ]*/
+                LogError("Lower Layer transport not created.");
+                free(result);
+                result = NULL;
+            }
+            else
+            {
+                /*Codes_SRS_IOTHUBTRANSPORT_17_007: [ IoTHubTransport_Create shall create the transport lock by Calling Lock_Init. ]*/
+                result->lockHandle = Lock_Init();
+                if (result->lockHandle == NULL)
+                {
+                    /*Codes_SRS_IOTHUBTRANSPORT_17_008: [ If the lock creation fails, IoTHubTransport_Create shall return NULL. ]*/
+                    LogError("transport Lock not created.");
+                    transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle);
+                    free(result);
+                    result = NULL;
+                }
+                else if ((result->clientsLockHandle = Lock_Init()) == NULL)
+                {
+                    LogError("clients Lock not created.");
+                    Lock_Deinit(result->lockHandle);
+                    transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle);
+                    free(result);
+                    result = NULL;
+                }
+                else
+                {
+                    /*Codes_SRS_IOTHUBTRANSPORT_17_038: [ IoTHubTransport_Create shall call VECTOR_Create to make a list of IOTHUB_CLIENT_HANDLE using this transport. ]*/
+                    result->clients = VECTOR_create(sizeof(IOTHUB_CLIENT_HANDLE));
+                    if (result->clients == NULL)
+                    {
+                        /*Codes_SRS_IOTHUBTRANSPORT_17_039: [ If the Vector creation fails, IoTHubTransport_Create shall return NULL. ]*/
+                        /*Codes_SRS_IOTHUBTRANSPORT_17_009: [ IoTHubTransport_Create shall clean up any resources it creates if the function does not succeed. ]*/
+                        LogError("clients list not created.");
+                        Lock_Deinit(result->clientsLockHandle);
+                        Lock_Deinit(result->lockHandle);
+                        transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle);
+                        free(result);
+                        result = NULL;
+                    }
+                    else
+                    {
+                        /*Codes_SRS_IOTHUBTRANSPORT_17_001: [ IoTHubTransport_Create shall return a non-NULL handle on success.]*/
+                        result->stopThread = 1;
+                        result->clientDoWork = NULL;
+                        result->workerThreadHandle = NULL; /* create thread when work needs to be done */
                         result->IoTHubTransport_GetHostname = transportProtocol->IoTHubTransport_GetHostname;
-						result->IoTHubTransport_SetOption = transportProtocol->IoTHubTransport_SetOption;
-						result->IoTHubTransport_Create = transportProtocol->IoTHubTransport_Create;
-						result->IoTHubTransport_Destroy = transportProtocol->IoTHubTransport_Destroy;
-						result->IoTHubTransport_Register = transportProtocol->IoTHubTransport_Register;
-						result->IoTHubTransport_Unregister = transportProtocol->IoTHubTransport_Unregister;
-						result->IoTHubTransport_Subscribe = transportProtocol->IoTHubTransport_Subscribe;
-						result->IoTHubTransport_Unsubscribe = transportProtocol->IoTHubTransport_Unsubscribe;
-						result->IoTHubTransport_DoWork = transportProtocol->IoTHubTransport_DoWork;
+                        result->IoTHubTransport_SetOption = transportProtocol->IoTHubTransport_SetOption;
+                        result->IoTHubTransport_Create = transportProtocol->IoTHubTransport_Create;
+                        result->IoTHubTransport_Destroy = transportProtocol->IoTHubTransport_Destroy;
+                        result->IoTHubTransport_Register = transportProtocol->IoTHubTransport_Register;
+                        result->IoTHubTransport_Unregister = transportProtocol->IoTHubTransport_Unregister;
+                        result->IoTHubTransport_Subscribe = transportProtocol->IoTHubTransport_Subscribe;
+                        result->IoTHubTransport_Unsubscribe = transportProtocol->IoTHubTransport_Unsubscribe;
+                        result->IoTHubTransport_DoWork = transportProtocol->IoTHubTransport_DoWork;
                         result->IoTHubTransport_SetRetryPolicy = transportProtocol->IoTHubTransport_SetRetryPolicy;
-						result->IoTHubTransport_GetSendStatus = transportProtocol->IoTHubTransport_GetSendStatus;
-					}
-				}
-			}
-		}
-	}
-	
-	return result;
+                        result->IoTHubTransport_GetSendStatus = transportProtocol->IoTHubTransport_GetSendStatus;
+                    }
+                }
+            }
+        }
+    }
+    
+    return result;
+}
+
+static void multiplexed_client_do_work(TRANSPORT_HANDLE_DATA* transportData)
+{
+    if (Lock(transportData->clientsLockHandle) != LOCK_OK)
+    {
+        LogError("failed to lock for multiplexed_client_do_work");
+    }
+    else
+    {
+        size_t numberOfClients;
+        size_t iterator;
+
+        numberOfClients = VECTOR_size(transportData->clients);
+        for (iterator = 0; iterator < numberOfClients; iterator++)
+        {
+            IOTHUB_CLIENT_HANDLE* clientHandle = (IOTHUB_CLIENT_HANDLE*)VECTOR_element(transportData->clients, iterator);
+
+            if (clientHandle != NULL)
+            {
+                transportData->clientDoWork(*clientHandle);
+            }
+        }
+
+        if (Unlock(transportData->clientsLockHandle) != LOCK_OK)
+        {
+            LogError("failed to unlock on multiplexed_client_do_work");
+        }
+    }
 }
 
 static int transport_worker_thread(void* threadArgument)
 {
-	TRANSPORT_HANDLE_DATA* transportData = (TRANSPORT_HANDLE_DATA*)threadArgument;
+    TRANSPORT_HANDLE_DATA* transportData = (TRANSPORT_HANDLE_DATA*)threadArgument;
 
-	while (1)
-	{
-		/*Codes_SRS_IOTHUBTRANSPORT_17_030: [ All calls to lower layer transport DoWork shall be protected by the lock created in IoTHubTransport_Create. ]*/
-		if (Lock(transportData->lockHandle) == LOCK_OK)
-		{
-			/*Codes_SRS_IOTHUBTRANSPORT_17_031: [ If acquiring the lock fails, lower layer transport DoWork shall not be called. ]*/
-			if (transportData->stopThread)
-			{
-				/*Codes_SRS_IOTHUBTRANSPORT_17_028: [ The thread shall exit when IoTHubTransport_EndWorkerThread has been called for each clientHandle which invoked IoTHubTransport_StartWorkerThread. ]*/
-				(void)Unlock(transportData->lockHandle);
-				break;
-			}
-			else
-			{
-				(transportData->IoTHubTransport_DoWork)(transportData->transportLLHandle, NULL);
-				(void)Unlock(transportData->lockHandle);
-			}
-		}
-		/*Codes_SRS_IOTHUBTRANSPORT_17_029: [ The thread shall call lower layer transport DoWork every 1 ms. ]*/
-		ThreadAPI_Sleep(1);
-	}
+    while (1)
+    {
+        /*Codes_SRS_IOTHUBTRANSPORT_17_030: [ All calls to lower layer transport DoWork shall be protected by the lock created in IoTHubTransport_Create. ]*/
+        if (Lock(transportData->lockHandle) == LOCK_OK)
+        {
+            /*Codes_SRS_IOTHUBTRANSPORT_17_031: [ If acquiring the lock fails, lower layer transport DoWork shall not be called. ]*/
+            if (transportData->stopThread)
+            {
+                /*Codes_SRS_IOTHUBTRANSPORT_17_028: [ The thread shall exit when IoTHubTransport_EndWorkerThread has been called for each clientHandle which invoked IoTHubTransport_StartWorkerThread. ]*/
+                (void)Unlock(transportData->lockHandle);
+                break;
+            }
+            else
+            {
+                (transportData->IoTHubTransport_DoWork)(transportData->transportLLHandle, NULL);
 
-	return 0;
+                (void)Unlock(transportData->lockHandle);
+            }
+        }
+
+        multiplexed_client_do_work(transportData);
+
+        /*Codes_SRS_IOTHUBTRANSPORT_17_029: [ The thread shall call lower layer transport DoWork every 1 ms. ]*/
+        ThreadAPI_Sleep(1);
+    }
+
+    return 0;
 }
 
 static bool find_by_handle(const void* element, const void* value)
 {
-	/* data stored at element is device handle */
-	const IOTHUB_CLIENT_HANDLE * guess = (const IOTHUB_CLIENT_HANDLE *)element;
-	const IOTHUB_CLIENT_HANDLE match = (const IOTHUB_CLIENT_HANDLE)value;
-	return (*guess == match);
+    /* data stored at element is device handle */
+    const IOTHUB_CLIENT_HANDLE * guess = (const IOTHUB_CLIENT_HANDLE *)element;
+    const IOTHUB_CLIENT_HANDLE match = (const IOTHUB_CLIENT_HANDLE)value;
+    return (*guess == match);
 }
 
 static IOTHUB_CLIENT_RESULT start_worker_if_needed(TRANSPORT_HANDLE_DATA * transportData, IOTHUB_CLIENT_HANDLE clientHandle)
 {
-	IOTHUB_CLIENT_RESULT result;
-	if (transportData->workerThreadHandle == NULL)
-	{
-		/*Codes_SRS_IOTHUBTRANSPORT_17_018: [ If the worker thread does not exist, IoTHubTransport_StartWorkerThread shall start the thread using ThreadAPI_Create. ]*/
-		transportData->stopThread = 0;
-		if (ThreadAPI_Create(&transportData->workerThreadHandle, transport_worker_thread, transportData) != THREADAPI_OK)
-		{
-			transportData->workerThreadHandle = NULL;
-		}
-	}
-	if (transportData->workerThreadHandle != NULL)
-	{
-		/*Codes_SRS_IOTHUBTRANSPORT_17_020: [ IoTHubTransport_StartWorkerThread shall search for IoTHubClient clientHandle in the list of IoTHubClient handles. ]*/
-		bool addToList = ((VECTOR_size(transportData->clients) == 0) || (VECTOR_find_if(transportData->clients, find_by_handle, clientHandle) == NULL));
-		if (addToList)
-		{
-			/*Codes_SRS_IOTHUBTRANSPORT_17_021: [ If handle is not found, then clientHandle shall be added to the list. ]*/
-			if (VECTOR_push_back(transportData->clients, &clientHandle, 1) != 0)
-			{
-				/*Codes_SRS_IOTHUBTRANSPORT_17_042: [ If Adding to the client list fails, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_ERROR. ]*/
-				result = IOTHUB_CLIENT_ERROR;
-			}
-			else
-			{
-				result = IOTHUB_CLIENT_OK;
-			}
-		}
-		else
-		{
-			result = IOTHUB_CLIENT_OK;
-		}
-	}
-	else
-	{
-		result = IOTHUB_CLIENT_ERROR;
-	}
-	return result;
+    IOTHUB_CLIENT_RESULT result;
+    if (transportData->workerThreadHandle == NULL)
+    {
+        /*Codes_SRS_IOTHUBTRANSPORT_17_018: [ If the worker thread does not exist, IoTHubTransport_StartWorkerThread shall start the thread using ThreadAPI_Create. ]*/
+        transportData->stopThread = 0;
+        if (ThreadAPI_Create(&transportData->workerThreadHandle, transport_worker_thread, transportData) != THREADAPI_OK)
+        {
+            transportData->workerThreadHandle = NULL;
+        }
+    }
+    if (transportData->workerThreadHandle != NULL)
+    {
+        if (Lock(transportData->clientsLockHandle) != LOCK_OK)
+        {
+            LogError("failed to lock for start_worker_if_needed");
+            result = IOTHUB_CLIENT_ERROR;
+        }
+        else
+        {
+            /*Codes_SRS_IOTHUBTRANSPORT_17_020: [ IoTHubTransport_StartWorkerThread shall search for IoTHubClient clientHandle in the list of IoTHubClient handles. ]*/
+            bool addToList = ((VECTOR_size(transportData->clients) == 0) || (VECTOR_find_if(transportData->clients, find_by_handle, clientHandle) == NULL));
+            if (addToList)
+            {
+                /*Codes_SRS_IOTHUBTRANSPORT_17_021: [ If handle is not found, then clientHandle shall be added to the list. ]*/
+                if (VECTOR_push_back(transportData->clients, &clientHandle, 1) != 0)
+                {
+                    LogError("Failed adding device to list (VECTOR_push_back failed)");
+                    /*Codes_SRS_IOTHUBTRANSPORT_17_042: [ If Adding to the client list fails, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_ERROR. ]*/
+                    result = IOTHUB_CLIENT_ERROR;
+                }
+                else
+                {
+                    result = IOTHUB_CLIENT_OK;
+                }
+            }
+            else
+            {
+                result = IOTHUB_CLIENT_OK;
+            }
+
+            if (Unlock(transportData->clientsLockHandle) != LOCK_OK)
+            {
+                LogError("failed to unlock on start_worker_if_needed");
+            }
+        }
+    }
+    else
+    {
+        result = IOTHUB_CLIENT_ERROR;
+    }
+    return result;
 }
 
 static void stop_worker_thread(TRANSPORT_HANDLE_DATA * transportData)
 {
-	/*Codes_SRS_IOTHUBTRANSPORT_17_043: [** IoTHubTransport_SignalEndWorkerThread shall signal the worker thread to end.*/
-	transportData->stopThread = 1;
+    /*Codes_SRS_IOTHUBTRANSPORT_17_043: [** IoTHubTransport_SignalEndWorkerThread shall signal the worker thread to end.*/
+    transportData->stopThread = 1;
 }
 
 static void wait_worker_thread(TRANSPORT_HANDLE_DATA * transportData)
 {
-	if (transportData->workerThreadHandle != NULL)
-	{
-		int res;
-		/*Codes_SRS_IOTHUBTRANSPORT_17_027: [ If handle list is empty, IoTHubTransport_EndWorkerThread shall be joined. ]*/
-		if (ThreadAPI_Join(transportData->workerThreadHandle, &res) != THREADAPI_OK)
-		{
-			LogError("ThreadAPI_Join failed");
-		}
-		else
-		{
-			transportData->workerThreadHandle = NULL;
-		}
-	}
+    if (transportData->workerThreadHandle != NULL)
+    {
+        int res;
+        /*Codes_SRS_IOTHUBTRANSPORT_17_027: [ If handle list is empty, IoTHubTransport_EndWorkerThread shall be joined. ]*/
+        if (ThreadAPI_Join(transportData->workerThreadHandle, &res) != THREADAPI_OK)
+        {
+            LogError("ThreadAPI_Join failed");
+        }
+        else
+        {
+            transportData->workerThreadHandle = NULL;
+        }
+    }
 }
 
 static bool signal_end_worker_thread(TRANSPORT_HANDLE_DATA * transportData, IOTHUB_CLIENT_HANDLE clientHandle)
 {
-	bool okToJoin;
-	void* element = VECTOR_find_if(transportData->clients, find_by_handle, clientHandle);
-	if (element != NULL)
-	{
-		/*Codes_SRS_IOTHUBTRANSPORT_17_026: [ IoTHubTransport_EndWorkerThread shall remove clientHandlehandle from handle list. ]*/
-		VECTOR_erase(transportData->clients, element, 1);
-	}
-	/*Codes_SRS_IOTHUBTRANSPORT_17_025: [ If the worker thread does not exist, then IoTHubTransport_EndWorkerThread shall return. ]*/
-	if (transportData->workerThreadHandle != NULL)
-	{
-		if (VECTOR_size(transportData->clients) == 0)
-		{
-			stop_worker_thread(transportData);
-			okToJoin = true;
-		}
-		else
-		{
-			okToJoin = false;
-		}
-	}
-	else
-	{
-		okToJoin = false;
-	}
-	return okToJoin;
+    bool okToJoin;
+
+    if (Lock(transportData->clientsLockHandle) != LOCK_OK)
+    {
+        LogError("failed to lock for signal_end_worker_thread");
+        okToJoin = false;
+    }
+    else
+    {
+        void* element = VECTOR_find_if(transportData->clients, find_by_handle, clientHandle);
+        if (element != NULL)
+        {
+            /*Codes_SRS_IOTHUBTRANSPORT_17_026: [ IoTHubTransport_EndWorkerThread shall remove clientHandlehandle from handle list. ]*/
+            VECTOR_erase(transportData->clients, element, 1);
+        }
+        /*Codes_SRS_IOTHUBTRANSPORT_17_025: [ If the worker thread does not exist, then IoTHubTransport_EndWorkerThread shall return. ]*/
+        if (transportData->workerThreadHandle != NULL)
+        {
+            if (VECTOR_size(transportData->clients) == 0)
+            {
+                stop_worker_thread(transportData);
+                okToJoin = true;
+            }
+            else
+            {
+                okToJoin = false;
+            }
+        }
+        else
+        {
+            okToJoin = false;
+        }
+
+        if (Unlock(transportData->clientsLockHandle) != LOCK_OK)
+        {
+            LogError("failed to unlock on signal_end_worker_thread");
+        }
+    }
+    return okToJoin;
 }
 
 void IoTHubTransport_Destroy(TRANSPORT_HANDLE transportHandle)
 {
-	/*Codes_SRS_IOTHUBTRANSPORT_17_011: [ IoTHubTransport_Destroy shall do nothing if transportHandle is NULL. ]*/
-	if (transportHandle != NULL)
-	{
-		TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
-		/*Codes_SRS_IOTHUBTRANSPORT_17_033: [ IoTHubTransport_Destroy shall lock the transport lock. ]*/
-		if (Lock(transportData->lockHandle) != LOCK_OK)
-		{
-			LogError("Unable to lock - will still attempt to end thread without thread safety");
-			stop_worker_thread(transportData);
-		}
-		else
-		{
-			stop_worker_thread(transportData);
-			(void)Unlock(transportData->lockHandle);
-		}
-		wait_worker_thread(transportData);
-		/*Codes_SRS_IOTHUBTRANSPORT_17_010: [ IoTHubTransport_Destroy shall free all resources. ]*/
-		Lock_Deinit(transportData->lockHandle);
-		(transportData->IoTHubTransport_Destroy)(transportData->transportLLHandle);
-		VECTOR_destroy(transportData->clients);
-		free(transportHandle);
-	}
+    /*Codes_SRS_IOTHUBTRANSPORT_17_011: [ IoTHubTransport_Destroy shall do nothing if transportHandle is NULL. ]*/
+    if (transportHandle != NULL)
+    {
+        TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
+        /*Codes_SRS_IOTHUBTRANSPORT_17_033: [ IoTHubTransport_Destroy shall lock the transport lock. ]*/
+        if (Lock(transportData->lockHandle) != LOCK_OK)
+        {
+            LogError("Unable to lock - will still attempt to end thread without thread safety");
+            stop_worker_thread(transportData);
+        }
+        else
+        {
+            stop_worker_thread(transportData);
+            (void)Unlock(transportData->lockHandle);
+        }
+        wait_worker_thread(transportData);
+        /*Codes_SRS_IOTHUBTRANSPORT_17_010: [ IoTHubTransport_Destroy shall free all resources. ]*/
+        Lock_Deinit(transportData->lockHandle);
+        (transportData->IoTHubTransport_Destroy)(transportData->transportLLHandle);
+        VECTOR_destroy(transportData->clients);
+        Lock_Deinit(transportData->clientsLockHandle);
+        free(transportHandle);
+    }
 }
 
 LOCK_HANDLE IoTHubTransport_GetLock(TRANSPORT_HANDLE transportHandle)
 {
-	LOCK_HANDLE lock;
-	if (transportHandle == NULL)
-	{
-		/*Codes_SRS_IOTHUBTRANSPORT_17_013: [ If transportHandle is NULL, IoTHubTransport_GetLock shall return NULL. ]*/
-		lock = NULL;
-	}
-	else
-	{
-		/*Codes_SRS_IOTHUBTRANSPORT_17_012: [ IoTHubTransport_GetLock shall return a handle to the transport lock. ]*/
-		TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
-		lock = transportData->lockHandle;
-	}
-	return lock;
+    LOCK_HANDLE lock;
+    if (transportHandle == NULL)
+    {
+        /*Codes_SRS_IOTHUBTRANSPORT_17_013: [ If transportHandle is NULL, IoTHubTransport_GetLock shall return NULL. ]*/
+        lock = NULL;
+    }
+    else
+    {
+        /*Codes_SRS_IOTHUBTRANSPORT_17_012: [ IoTHubTransport_GetLock shall return a handle to the transport lock. ]*/
+        TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
+        lock = transportData->lockHandle;
+    }
+    return lock;
 }
 
 TRANSPORT_LL_HANDLE IoTHubTransport_GetLLTransport(TRANSPORT_HANDLE transportHandle)
 {
-	TRANSPORT_LL_HANDLE llTransport;
-	if (transportHandle == NULL)
-	{
-		/*Codes_SRS_IOTHUBTRANSPORT_17_015: [ If transportHandle is NULL, IoTHubTransport_GetLLTransport shall return NULL. ]*/
-		llTransport = NULL;
-	}
-	else
-	{
-		/*Codes_SRS_IOTHUBTRANSPORT_17_014: [ IoTHubTransport_GetLLTransport shall return a handle to the lower layer transport. ]*/
-		TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
-		llTransport = transportData->transportLLHandle;
-	}
-	return llTransport;
+    TRANSPORT_LL_HANDLE llTransport;
+    if (transportHandle == NULL)
+    {
+        /*Codes_SRS_IOTHUBTRANSPORT_17_015: [ If transportHandle is NULL, IoTHubTransport_GetLLTransport shall return NULL. ]*/
+        llTransport = NULL;
+    }
+    else
+    {
+        /*Codes_SRS_IOTHUBTRANSPORT_17_014: [ IoTHubTransport_GetLLTransport shall return a handle to the lower layer transport. ]*/
+        TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
+        llTransport = transportData->transportLLHandle;
+    }
+    return llTransport;
 }
 
-IOTHUB_CLIENT_RESULT IoTHubTransport_StartWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_HANDLE clientHandle)
+IOTHUB_CLIENT_RESULT IoTHubTransport_StartWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_HANDLE clientHandle, IOTHUB_CLIENT_MULTIPLEXED_DO_WORK muxDoWork)
 {
-	IOTHUB_CLIENT_RESULT result;
-	if (transportHandle == NULL || clientHandle == NULL)
-	{
-		/*Codes_SRS_IOTHUBTRANSPORT_17_016: [ If transportHandle is NULL, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_INVALID_ARG. ]*/
-		/*Codes_SRS_IOTHUBTRANSPORT_17_017: [ If clientHandle is NULL, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_INVALID_ARG. ]*/
-		result = IOTHUB_CLIENT_INVALID_ARG;
-	}
-	else
-	{
-		TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
+    IOTHUB_CLIENT_RESULT result;
+    if (transportHandle == NULL || clientHandle == NULL)
+    {
+        /*Codes_SRS_IOTHUBTRANSPORT_17_016: [ If transportHandle is NULL, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_INVALID_ARG. ]*/
+        /*Codes_SRS_IOTHUBTRANSPORT_17_017: [ If clientHandle is NULL, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_INVALID_ARG. ]*/
+        result = IOTHUB_CLIENT_INVALID_ARG;
+    }
+    else
+    {
+        TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
 
-		if ((result = start_worker_if_needed(transportData, clientHandle)) != IOTHUB_CLIENT_OK)
-		{
-			/*Codes_SRS_IOTHUBTRANSPORT_17_019: [ If thread creation fails, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_ERROR. */
-			LogError("Unable to start thread safely");
-		}
-		else
-		{
-			/*Codes_SRS_IOTHUBTRANSPORT_17_022: [ Upon success, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_OK. ]*/
-			result = IOTHUB_CLIENT_OK;
-		}
-	}
-	return result;
+        if (transportData->clientDoWork == NULL)
+        {
+            transportData->clientDoWork = muxDoWork;
+        }
+
+        if ((result = start_worker_if_needed(transportData, clientHandle)) != IOTHUB_CLIENT_OK)
+        {
+            /*Codes_SRS_IOTHUBTRANSPORT_17_019: [ If thread creation fails, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_ERROR. */
+            LogError("Unable to start thread safely");
+        }
+        else
+        {
+            /*Codes_SRS_IOTHUBTRANSPORT_17_022: [ Upon success, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_OK. ]*/
+            result = IOTHUB_CLIENT_OK;
+        }
+    }
+    return result;
 }
 
 bool IoTHubTransport_SignalEndWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_HANDLE clientHandle)
 {
-	bool okToJoin;
-	/*Codes_SRS_IOTHUBTRANSPORT_17_023: [ If transportHandle is NULL, IoTHubTransport_EndWorkerThread shall return. ]*/
-	/*Codes_SRS_IOTHUBTRANSPORT_17_024: [ If clientHandle is NULL, IoTHubTransport_EndWorkerThread shall return. ]*/
-	if (!(transportHandle == NULL || clientHandle == NULL))
-	{
-		TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
-		okToJoin = signal_end_worker_thread(transportData, clientHandle);
-	}
-	else
-	{
-		okToJoin = false;
-	}
-	return okToJoin;
+    bool okToJoin;
+    /*Codes_SRS_IOTHUBTRANSPORT_17_023: [ If transportHandle is NULL, IoTHubTransport_EndWorkerThread shall return. ]*/
+    /*Codes_SRS_IOTHUBTRANSPORT_17_024: [ If clientHandle is NULL, IoTHubTransport_EndWorkerThread shall return. ]*/
+    if (!(transportHandle == NULL || clientHandle == NULL))
+    {
+        TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
+        okToJoin = signal_end_worker_thread(transportData, clientHandle);
+    }
+    else
+    {
+        okToJoin = false;
+    }
+    return okToJoin;
 }
 
 void IoTHubTransport_JoinWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_HANDLE clientHandle)
 {
-	/*Codes_SRS_IOTHUBTRANSPORT_17_044: [ If transportHandle is NULL, IoTHubTransport_JoinWorkerThread shall do nothing. ]*/
-	/*Codes_SRS_IOTHUBTRANSPORT_17_045: [ If clientHandle is NULL, IoTHubTransport_JoinWorkerThread shall do nothing. ]*/
-	if (!(transportHandle == NULL || clientHandle == NULL))
-	{
-		TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
-		/*Codes_SRS_IOTHUBTRANSPORT_17_027: [ The worker thread shall be joined. ]*/
-		wait_worker_thread(transportData);
-	}
+    /*Codes_SRS_IOTHUBTRANSPORT_17_044: [ If transportHandle is NULL, IoTHubTransport_JoinWorkerThread shall do nothing. ]*/
+    /*Codes_SRS_IOTHUBTRANSPORT_17_045: [ If clientHandle is NULL, IoTHubTransport_JoinWorkerThread shall do nothing. ]*/
+    if (!(transportHandle == NULL || clientHandle == NULL))
+    {
+        TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
+        /*Codes_SRS_IOTHUBTRANSPORT_17_027: [ The worker thread shall be joined. ]*/
+        wait_worker_thread(transportData);
+    }
 }