WIP. send a large constant string twice a second, in order to test out the transport with something indicative of our required load.

Dependencies:   FXOS8700CQ NTPClient azure_umqtt_c iothub_mqtt_transport mbed-rtos mbed wolfSSL Socket lwip-eth lwip-sys lwip

Fork of FXOS8700CQ_To_Azure_IoT by Mark Radbourne

main.cpp

Committer:
julianhigginson
Date:
2017-01-05
Revision:
7:0d1a0fe537dc
Parent:
6:66576aa3d4e3

File content as of revision 7:0d1a0fe537dc:

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file at https://github.com/Azure/azure-iot-sdks/blob/master/LICENSE for full license information.

/* -------------------------------------------------------------------------- *\

   Simple progam to demonstrate reading the FRDM-K64F FXOS8700CQ 
   accelerometer, convert the data to JSON and send to an Azure IoT Hub. You 
   must provide your hub's connection string in the variable 
   'connectionString'.

   markrad

\* -------------------------------------------------------------------------- */

#include <string.h>

#include "SingletonFXOS8700CQ.h"

#include "iothub_client.h"
#include "iothub_message.h"
#include "azure_c_shared_utility/threadapi.h"
#include "azure_c_shared_utility/crt_abstractions.h"
#include "azure_c_shared_utility/platform.h"
#include "iothubtransportmqtt.h"
#include "lock.h"

#include "certs.h"

//JH A dummy string to send every 0.5 sec.
//#define dummystring "{\"timestamp\":12345678901234567890,\"device\":\"JulianTestSensor\",\"A\":[+0000,-0002,+0001],\"B\":[-0002,-0001,-0004],\"C\":[-0003,-0002,-0002],\"D\":[-0002,-0003,-0004],\"E\":[-0002,-0002,-0002],\"F\":[-0002,-0004,-0001],\"G\":[-0001,-0002,-0004],\"H\":[-0003,-0002,-0003],\"I\":[-0003,-0002,+0000],\"J\":[-0002,-0003,-0001],\"K\":[-0003,-0001,-0002],\"L\":[-0001,-0003,-0003],\"M\":[-0003,-0003,-0004],\"N\":[-0001,-0003,-0004],\"O\":[-0003,-0002,+0000],\"P\":[-0001,-0002,-0004],\"Q\":[-0003,-0003,+0000],\"R\":[-0002,-0003,-0002],\"S\":[-0001,-0002,-0004],\"T\":[-0002,-0003,-0003],\"U\":[-0005,-0001,-0002],\"V\":[+0000,-0003,+0000],\"W\":[-0003,-0002,-0008],\"X\":[+0000,-0003,+0000]}"  
#define dummystring "{\"timestamp\":12345678901234567890,\"device\":\"JulianTestSensor\",\"largest magnitude\":1234}"  

int readingToJSON(char *buffer, int bufferlen, READING &reading)
{
    static const char READING[] = "\"reading\"";
    static const char ACCELEROMETER[] = "\"accelerometer\"";
    static const char MAGNOMETER[] = "\"magnometer\"";
    static const char X[] = "\"X\"";
    static const char Y[] = "\"Y\"";
    static const char Z[] = "\"Z\"";
    static const char STARTOBJ[] = " : {\n";
    static const char ENDOBJ[] = "}\n";
    static const char PREPEND[] = "{\n";
    static const int MINBUFFERLEN = 
        sizeof(READING) + 
        sizeof(ACCELEROMETER) +
        sizeof(MAGNOMETER) +
        2 * (sizeof(X) + sizeof(Y) + sizeof(Z)) +
        3 * sizeof(STARTOBJ) +
        4 * sizeof(ENDOBJ) +
        sizeof(PREPEND) +
        6 * 9;
    static const char numConvert[] = "%d";
    
    char toNum[10];
    char work[MINBUFFERLEN + 1];
    
    if (buffer == NULL)
        return 0;
    
    buffer[0] = '\0';
        
    strcpy(work, PREPEND);
    strcat(work, READING);
    strcat(work, STARTOBJ);
    strcat(work, ACCELEROMETER);
    strcat(work, STARTOBJ);
    strcat(work, X);
    strcat(work, " : ");
    sprintf(toNum, numConvert, reading.accelerometer.x);
    strcat(work, toNum);
    strcat(work, ",\n");
    strcat(work, Y);
    strcat(work, " : ");
    sprintf(toNum, numConvert, reading.accelerometer.y);
    strcat(work, toNum);
    strcat(work, ",\n");
    strcat(work, Z);
    strcat(work, " : ");
    sprintf(toNum, numConvert, reading.accelerometer.z);
    strcat(work, toNum);
    strcat(work, "\n");
    strcat(work, ENDOBJ);
    strcat(work, MAGNOMETER);
    strcat(work, STARTOBJ);
    strcat(work, X);
    strcat(work, " : ");
    sprintf(toNum, numConvert, reading.magnometer.x);
    strcat(work, toNum);
    strcat(work, ",\n");
    strcat(work, Y);
    strcat(work, " : ");
    sprintf(toNum, numConvert, reading.magnometer.y);
    strcat(work, toNum);
    strcat(work, ",\n");
    strcat(work, Z);
    strcat(work, " : ");
    sprintf(toNum, numConvert, reading.magnometer.z);
    strcat(work, toNum);
    strcat(work, "\n");
    strcat(work, ENDOBJ);
    strcat(work, ENDOBJ);
    strcat(work, ENDOBJ);
    
    //if (strlen(work) + 1 < bufferlen)
    //    strcpy(buffer, work); 
    strcpy(buffer,dummystring);  //JH send dummystring
    //return strlen(work);
    return strlen(dummystring); //JH send dummystring
}

static LOCK_HANDLE msgLock;
static int msgCount = 0;
static Timer t;
static int CONNECTIONTIMEOUT = (20 * 1000);

static IOTHUBMESSAGE_DISPOSITION_RESULT ReceiveMessageCallback(IOTHUB_MESSAGE_HANDLE message, void* userContextCallback)
{
    int* counter = (int*)userContextCallback;
    const char* buffer;
    size_t size;

    if (IoTHubMessage_GetByteArray(message, (const unsigned char**)&buffer, &size) != IOTHUB_MESSAGE_OK)
    {
        (void)printf("unable to retrieve the message data\r\n");
    }
    else
    {
        (void)printf("Received Message [%d] with Data: <<<%.*s>>> & Size=%d\r\n", *counter, (int)size, buffer, (int)size);
    }

    // Some device specific action code goes here...
    (*counter)++;

    return IOTHUBMESSAGE_ACCEPTED;
}

static void SendConfirmationCallback(IOTHUB_CLIENT_CONFIRMATION_RESULT result, void* userContextCallback)
{
    int* messageTrackingId = (int*)userContextCallback;
    
    (void)printf("Confirmation received for message tracking id = %d with result = %s\r\n", 
        *messageTrackingId, ENUM_TO_STRING(IOTHUB_CLIENT_CONFIRMATION_RESULT, result));
        
    free(userContextCallback);
    Lock(msgLock);
    msgCount--;
        
    if (result == IOTHUB_CLIENT_CONFIRMATION_OK)
    {
        t.stop();
        t.reset();
    }

    Unlock(msgLock);

}

void stall(Serial &pc, char *message)
{
    printf(message);
    printf("stalled ");
    
    while(true) {
        pc.putc('.'); // idle dots
        wait(1.0);
    }
}

IOTHUB_CLIENT_HANDLE setupConnection(Serial &pc, const char *connectionString, IOTHUB_CLIENT_TRANSPORT_PROVIDER protocol, void *receiveContext)
{
    IOTHUB_CLIENT_HANDLE iotHubClientHandle = NULL;
    
    printf("Calling platform_init\r\n");
    
    while (platform_init())
    {
        pc.putc('P');
        wait(1.0);
        platform_deinit();
    }
    
//    if (platform_init() != 0)
//        stall(pc, "Failed to initialize platform\n");
        
    printf("Calling IoTHubClient_CreateFromConnectionString\r\n");
    if ((iotHubClientHandle = IoTHubClient_CreateFromConnectionString(connectionString, protocol)) == NULL)
        stall(pc, "ERROR: Could not create iotHubClientHandle\n");
        
    bool traceOn = false;
    //bool traceOn = true;
    
    printf("Calling IoTHubClient_SetOption logtrace with %d\r\n", traceOn);
    IoTHubClient_SetOption(iotHubClientHandle, "logtrace", &traceOn);

    // For mbed add the certificate information
    printf("Calling IoTHubClient_SetOption TrustedCerts\r\n");

    if (IoTHubClient_SetOption(iotHubClientHandle, "TrustedCerts", certificates) != IOTHUB_CLIENT_OK)
        stall(pc, "ERROR: failure to set option \"TrustedCerts\"\n");

    printf("Calling IoTHubClient_SetMessageCallback\r\n");

    if (IoTHubClient_SetMessageCallback(iotHubClientHandle, ReceiveMessageCallback, receiveContext) != IOTHUB_CLIENT_OK)
        stall(pc, "ERROR: IoTHubClient_SetMessageCallback failed\r\n");
    
    return iotHubClientHandle;
}

void terminateConnection(Serial &pc, IOTHUB_CLIENT_HANDLE iotHubClientHandle)
{
    printf("Calling IoTHubClient_Destroy\r\n");
    IoTHubClient_Destroy(iotHubClientHandle);
    printf("Calling platform_deinit\r\n");
    platform_deinit();
    printf("Connection terminated\r\n");
}    


int main()
{
    //Mark's IoThub - Mark
    //const char *connectionString = "HostName=MarkRadML.azure-devices.net;DeviceId=MBEDTest;SharedAccessKey=rgxlnR0rIBW4vnnnDkrbAv+mSOc/Mt60mg1CEjLx7pY=";

    //Julian's IoThub - Julian
    const char *connectionString = "HostName=MoI2.azure-devices.net;DeviceId=JulianSensor1;SharedAccessKey=tMJGdbSw6fmKO6tULERuPIBhq1dcJHbrlTHoVm+o0W4="; 

    //Julian's IoThub - Andy
    //const char *connectionString = "HostName=MoI2.azure-devices.net;DeviceId=AndySensor1;SharedAccessKey=Zc08nWRiXh+4tLdE5Zi3Na4I0+pN9N4dnHl7SJqg430=";

    READING reading;
    Serial pc(USBTX, USBRX); // Primary output to demonstrate library
    SingletonFXOS8700CQ &sfxos = SingletonFXOS8700CQ::getInstance();
    IOTHUB_CLIENT_HANDLE iotHubClientHandle;
    int receiveContext = 0;
    int transmitCounter = 0;
    
    pc.baud(115200); // Print quickly! 200Hz x line of output data!
    
    printf("\n\nFXOS8700CQ identity = %X\n", sfxos.getWhoAmI());
    
    msgLock = Lock_Init();  // TODO: Check error code
    
    sfxos.enable();
    sfxos.getData(reading);
    
    int rc;

    int LOOPCOUNT = -1;     // Set to -1 to run forever
    
    int localMsgCount;
    int *userContext;
    IOTHUB_MESSAGE_HANDLE msgHandle;
    int elapsedTime = 0;
    
    //char buffer[200];
    char buffer [1024]; //JH bigger buffer
    
    iotHubClientHandle = setupConnection(pc, connectionString, MQTT_Protocol, &receiveContext);
    
    while (LOOPCOUNT)
    {
        if (sfxos.getInt2Triggered())
        {
            sfxos.setInt2Triggered(false);
            sfxos.getData(reading);
            rc = readingToJSON(buffer, sizeof(buffer), reading);
            
            if (rc > sizeof(buffer))
                printf("ERROR: JSON buffer too small - require %d characters\n", rc);

            Lock(msgLock);
            localMsgCount = msgCount;
            Unlock(msgLock);
            
            if (localMsgCount < 2)
            {
                if ((msgHandle = IoTHubMessage_CreateFromByteArray((const unsigned char*)buffer, rc)) == NULL)
                {
                    (void)printf("ERROR: iotHubMessageHandle is NULL!\r\n");
                }
                else
                {
                    userContext = (int *) malloc(sizeof(userContext));
                    
                    if (userContext != NULL)
                    {
                        *userContext = transmitCounter;
    
                        if (IoTHubClient_SendEventAsync(iotHubClientHandle, msgHandle, SendConfirmationCallback, userContext) != IOTHUB_CLIENT_OK)
                        {
                            (void)printf("ERROR: IoTHubClient_LL_SendEventAsync..........FAILED!\r\n");
                        }
                        else
                        {
                            (void)printf("IoTHubClient_LL_SendEventAsync accepted message [%d] for transmission to IoT Hub.\r\n", (int)transmitCounter);
                        }
                        
                        IoTHubMessage_Destroy(msgHandle);
                        Lock(msgLock);
                        msgCount++;
                        t.start();
                        Unlock(msgLock);
                        
                        transmitCounter++;
                    }
                    else
                    {
                        (void)printf("ERROR: malloc - unable to allocate user context\r\n");
                    }
                }
            }
            else
            {
                (void)printf("Message dropped queue length %d\r\n", localMsgCount);
            }
            
            Lock(msgLock);
            elapsedTime = t.read_ms();
            Unlock(msgLock);
            
            if (elapsedTime > CONNECTIONTIMEOUT)
            {
                printf("No response for %d milliseconds - attempt reconnection\r\n", elapsedTime);
                NVIC_SystemReset(); // Just blow it all away
                terminateConnection(pc, iotHubClientHandle);
                iotHubClientHandle = setupConnection(pc, connectionString, MQTT_Protocol, &receiveContext);
                printf("Reconnection complete\r\n");
            }
                        
            if (LOOPCOUNT > 0)
                LOOPCOUNT--;
        }
        
        wait_ms(500);
    }
    
    printf("Loop complete - clean up\n");
    
    terminateConnection(pc, iotHubClientHandle);
    Lock_Deinit(msgLock);
    
    printf("Test complete\n");
    

    while(true) {
        pc.putc('.'); // idle dots
        wait(1.0);
    }
}