iot_water_monitor_v2

Dependencies:   easy-connect-v16 Watchdog FP MQTTPacket RecordType-v-16 watersenor_and_temp_code

Simple-MQTT/SimpleMQTT.h

Committer:
DuyLionTran
Date:
2018-04-03
Revision:
57:898fcb6692cd
Parent:
55:c405323f8d5a

File content as of revision 57:898fcb6692cd:

#ifndef __SIMPLEMQTT_H__
#define __SIMPLEMQTT_H__

/***************************************************************
 * Includes
 ***************************************************************/ 
#include "easy-connect.h"
#include "MQTTClient.h"
#include "NDefLib/NDefNfcTag.h"
#include "NDefLib/RecordType/RecordURI.h"
#include "MQTTNetwork.h"
#include "MQTTmbed.h"

#include "Json.h"
#include "CommandExecution.h"
#include "flash_programming.h"
/***************************************************************
 * Definitions
 ***************************************************************/
 // Configuration values needed to connect to IBM IoT Cloud
#define ORG                 MQTT_ORG_ID             // connect to ORG.internetofthings.ibmcloud.com/ For a registered connection, replace with your org
#define ID                  MQTT_DEVICE_ID          // For a registered connection is your device id
#define AUTH_TOKEN          MQTT_DEVICE_PASSWORD    // For a registered connection is a device auth-token
#define DEFAULT_TYPE_NAME   MQTT_DEVICE_TYPE        // For a registered connection is device type
#define AUTH_METHOD         MQTT_USERNAME

#define TYPE                DEFAULT_TYPE_NAME       // For a registered connection, replace with your type
#define IBM_IOT_PORT        MQTT_PORT

#define MQTT_MAX_PACKET_SIZE    450   
#define MQTT_MAX_PAYLOAD_SIZE   350 

/***************************************************************
 * Variables
 ***************************************************************/ 
typedef enum {
    ADC_VALUE = 0,
    SENSOR_VALUE,
    RELAY_STATE,
    CONFIG_VALUE
} UploadType;

typedef enum {
	CONTROL_CMD = 0,
	READ_CMD,
	SETUP_CMD
} CommandType;
 
struct UploadValue {
    float ADC_TEMPVal;
    float ADC_DOVal;
    
    float SENSOR_TEMPVal;
    float SENSOR_DOVal;
    
    uint8_t   RELAY_State_1;
    uint8_t   RELAY_State_2;
    uint8_t   RELAY_State_3;
        
    uint32_t CONFIG_Time;
	
	uint8_t  CONFIG_Mode;
	uint8_t  CONFIG_OxyThreshold;
	uint8_t  CONFIG_TemperatureThreshold;
	uint16_t CONFIG_UploadInterval;              /* in second */
	
	uint32_t CONFIG_AlarmTime;
    uint8_t  CONFIG_SetRelayState_1;
    uint8_t  CONFIG_SetRelayState_2;	
} UploadValue; 
 
char       *projectName     = "WaterMonitor";
static char id[30]          = ID;               // mac without colons  
static char org[12]         = ORG;        
static char type[30]        = TYPE;
static char auth_token[30]  = AUTH_TOKEN;       // Auth_token is only used in non-quickstart mode
static int  connack_rc      = 0;                // MQTT connack return code
static bool wifiConnected   = true;
static bool netConnecting   = false;
static bool mqttConnecting  = false;
static bool netConnected    = false;
static bool connected       = false;
static int  retryAttempt    = 0;
static int  connectTimeout  = 1000;
uint16_t    commandID       = 0;
static char subscription_url[MQTT_MAX_PAYLOAD_SIZE];

extern struct UploadValue DataStruct;
/***************************************************************
 * Unity function definitions
 ***************************************************************/
void MQTT_MessageHandles(uint8_t ControlSignal); 
 
/** brief       Callback function when MQTT message arrives
 *  param[in]   msgMQTT
 *  retral      None 
 */ 
void MQTT_SubscribeCallback(MQTT::MessageData &msgMQTT); 

/** brief       Subscribe to a MQTT topic and set the MQTT callback function
 *  param[in]   subscribeTopic Topic to be subscribed
 *  param[in]   client         MQTT client 
 *  retral      returnCode from MQTTClient.h 
 */ 
int MQTT_Subscribe(char *subscribeTopic, MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, struct UploadValue uploadStruct);

/** brief       Connect to the internet then the MQTT network
 *  param[in]   client          MQTT client 
 *  param[in]   mqttNetwork     MQTT network 
 *  param[in]   network         The internet network interface (ethernet, wifi...)
 *  retral      Internet connect result and returnCode from MQTTClient.h 
 */ 
int MQTT_Connect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct);

/** brief       Setup the number of attempt to re-connect to the internet
 *  param[in]   attemptNumber The number of attemp
 */ 
int MQTT_GetConnTimeout(int attemptNumber);

/** brief    Try to reconnect to the internet and MQTT network
 *  retral   None
 */ 
void MQTT_AttemptConnect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct);

/** brief       Publish ADC values to the server
 *  param[in]   client        MQTT client 
 *  param[in]   inputTime     The time when the data is attempt to be sent 
 *  param[in]   adcVal_0      The ADC value to be sent
 *  retral      returnCode from MQTTClient.h 
 */
int MQTT_PublishADC(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float adcVal_0);

/** brief       Publish Sensor values to the server
 *  param[in]   client        MQTT client 
 *  param[in]   inputTime     The time when the data is attempt to be sent 
 *  param[in]   pHVal         The pHVal value to be sent
 *  retral      returnCode from MQTTClient.h 
 */
int MQTT_PublishSensorVal(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float TempVal, float DOVal);

/** brief       Publish relay states to the server
 *  param[in]   client        MQTT client 
 *  param[in]   inputTime     The time when the data is attempt to be sent 
 *  param[in]   relay1        Relay 1 state
 *  param[in]   relay2        Relay 2 state
 *  retral      returnCode from MQTTClient.h 
 */
int MQTT_PublishRelayState(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, int relay1, int relay2, int relay3);

/** brief       Publish relay states to the server
 *  param[in]   client        MQTT client 
 *  param[in]   inputTime     The time when the data is attempt to be sent 
 *  param[in]   mode          current mode: automatic (0) or manual (1)
 *  param[in]   maxOxi        Maximum Oxygen value
 *  param[in]   minOxi        Minimum Oxygen value
// *  param[in]   uploadInterval	Interval between upload turns
 *  retral      returnCode from MQTTClient.h 
 */
int MQTT_PublishConfigValue(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t mode, uint8_t OxyThreshold, uint8_t TemperatureThreshold, uint16_t UploadInterval);

/** brief       Upload all the data to the MQTT server
 *  param[in]   client          MQTT client 
 *  param[in]   inputTime       The time when the data is attempt to be sent 
 *  param[in]   uploadInterval  The period between each upload moment
 *  retral      returnCode from MQTTClient.h 
 */
int MQTT_PublishAll(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t uploadType, struct UploadValue uploadStruct);

int MQTT_PublishDeviceManage(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint32_t uploadPeriod, uint32_t sendTimePeriod);
/********************************************************************************************************************************************************************************************/
/***************************************************************
 * Unity function declarations
 ***************************************************************/ 
void MQTT_SubscribeCallback(MQTT::MessageData &msgMQTT) {
    // Message Handles
    char msg[MQTT_MAX_PAYLOAD_SIZE];
    msg[0]='\0';
    strncat (msg, (char*)msgMQTT.message.payload, msgMQTT.message.payloadlen);
    printf ("--->>> MQTT_SubscribeCallback msg: %s\n\r", msg);
    /* {"type":"3","deviceId":"string"} */
    Json json(msg, msgMQTT.message.payloadlen);
    if (!json.isValidJson()) {
        printf("Invalid JSON: %s", msg);
    }
    else {
        if (json.type(0) != JSMN_OBJECT ) {
            printf("Invalid JSON. ROOT element is not Object: %s", msg);
        }
        else {
            int CommandType;
            int KeyIndex      = json.findKeyIndexIn("type", 0);
            int KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            int ret = json.tokenIntegerValue(KeyValueIndex, CommandType); 
            
            int receiveCmdID;
            printf("Command Type: %d, error %d\r\n", CommandType, ret);
            
            switch (CommandType) {
            	case 0: printf("Calibrate\r\n");
            			CE_Calibrate();
            	break;
            	
            	case 1: printf("Set relay alarm state\r\n");
            			int AlarmTime, SetRelayState1, SetRelayState2;
            			KeyIndex      = json.findKeyIndexIn("cmdID", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, receiveCmdID); 
            			
            			KeyIndex      = json.findKeyIndexIn("AlarmTime", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, AlarmTime); 
            			
            			KeyIndex      = json.findKeyIndexIn("SetRelayState1", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, SetRelayState1);
            			
            			KeyIndex      = json.findKeyIndexIn("SetRelayState2", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, SetRelayState2);
            			
            			DataStruct.CONFIG_AlarmTime       = AlarmTime;
            			DataStruct.CONFIG_SetRelayState_1 = SetRelayState1;
            			DataStruct.CONFIG_SetRelayState_2 = SetRelayState2;
            			FP_SetAlarmValues(DataStruct.CONFIG_AlarmTime, DataStruct.CONFIG_SetRelayState_1, DataStruct.CONFIG_SetRelayState_2);            			
            	break;
            	
            	case 2: printf("Set relay state\r\n");
            			int relayState1, relayState2, relayState3;
            			KeyIndex      = json.findKeyIndexIn("cmdID", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, receiveCmdID); 
            			
            			KeyIndex      = json.findKeyIndexIn("relayState0", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, relayState1); 
            			
            			KeyIndex      = json.findKeyIndexIn("relayState1", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, relayState2);
            			
            			KeyIndex      = json.findKeyIndexIn("relayState2", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, relayState3);
            			
            			if (DataStruct.CONFIG_Mode == 0) {
	             			DataStruct.RELAY_State_1 = relayState1;     
	            			DataStruct.RELAY_State_2 = relayState2;   
	            			DataStruct.RELAY_State_3 = relayState3;     
	            			FP_WriteRelayStates(DataStruct.RELAY_State_1, DataStruct.RELAY_State_2, DataStruct.RELAY_State_3);      		
	            			CE_HandleRelays(relayState1, relayState2, relayState3);       				
            			}
  
            	break;
            	
            	case 3: printf("Set config values\r\n");
            			int mode, OxiThres, TempThres, uploadInterval;
            			KeyIndex      = json.findKeyIndexIn("cmdID", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, receiveCmdID); 
            			
            			KeyIndex      = json.findKeyIndexIn("mode", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, mode);     
            			        			
            			KeyIndex      = json.findKeyIndexIn("OxygenThreshold", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, OxiThres); 
            			
            			KeyIndex      = json.findKeyIndexIn("TemperatureThreshold", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, TempThres);     
            		
            			KeyIndex      = json.findKeyIndexIn("uploadInterval", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, uploadInterval);  
            		            			
            			DataStruct.CONFIG_Mode   		 		= mode;   
            			DataStruct.CONFIG_OxyThreshold 		 	= OxiThres;   
            			DataStruct.CONFIG_TemperatureThreshold	= TempThres;   
            			DataStruct.CONFIG_UploadInterval 		= uploadInterval * 60;	
            			printf("OMode %d\r\n", DataStruct.CONFIG_Mode );		
            			printf("Oxygen Threshold*10 %dppm(*100)\r\n", DataStruct.CONFIG_OxyThreshold);	
            			printf("Temperature Threshold %d*C\r\n", DataStruct.CONFIG_TemperatureThreshold);	
            			printf("Periodic upload time %ds\r\n", DataStruct.CONFIG_UploadInterval);	
            			FP_WriteConfigValues(DataStruct.CONFIG_Mode, DataStruct.CONFIG_OxyThreshold, DataStruct.CONFIG_TemperatureThreshold, DataStruct.CONFIG_UploadInterval);  	

            	break;
            	
            	case 4: printf("Synchronize data\r\n");
            			CE_UpdateImmediately();
            	case 5: printf("Set time\r\n");
            			int setRTCTime;
            			KeyIndex      = json.findKeyIndexIn("cmdID", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, receiveCmdID); 
            			
            			KeyIndex      = json.findKeyIndexIn("setRTCTime", 0);
            			KeyValueIndex = json.findChildIndexOf(KeyIndex, 0);
            			ret = json.tokenIntegerValue(KeyValueIndex, setRTCTime);  
            			
            			printf("Epoch time %d\r\n", (uint32_t)setRTCTime);             			
            			CE_SetRTCTime((uint32_t)setRTCTime);		
            			
            	break;
            	
            	default: break;
            }
        }    	
    }    
}

int MQTT_Subscribe(char *subscribeTopic, MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, struct UploadValue uploadStruct) {
    return client->subscribe(subscribeTopic, MQTT::QOS1, MQTT_SubscribeCallback);
}

int MQTT_Connect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct) {
    const char* iot_ibm = MQTT_BROKER_URL;     
    char hostname[strlen(org) + strlen(iot_ibm) + 1];
    
    sprintf(hostname, "%s%s", org, iot_ibm);
    // Construct clientId - d:org:type:id
    char clientId[strlen(org) + strlen(type) + strlen(id) + 5];  
    sprintf(clientId, "d:%s:%s:%s", org, type, id);  
    sprintf(subscription_url, "%s.%s/#/device/%s/%s/", org, "internetofthings.ibmcloud.com", id, DEFAULT_TYPE_NAME);

    // Network debug statements 
    LOG("=====================================\n\r");
    LOG("Nucleo IP ADDRESS: %s\n\r", network->get_ip_address());
    LOG("Nucleo MAC ADDRESS: %s\n\r", network->get_mac_address());
    LOG("Server Hostname: %s port: %d\n\r", hostname, IBM_IOT_PORT);
    LOG("Client ID: %s\n\r", clientId);
    LOG("Topic: %s\n\r",MQTT_EVENT_TOPIC);
    LOG("Subscription URL: %s\n\r", subscription_url);
    LOG("=====================================\n\r");    
    netConnecting = true;
    int rc = mqttNetwork->connect(hostname, IBM_IOT_PORT);
    if (rc != 0) {
        printf("rc from TCP connect is %d\r\n", rc);
        return rc;
    }
    
    printf ("--->TCP Connected\n\r");
    netConnected    = true;
    netConnecting   = false;        
        
    // MQTT Connect
    mqttConnecting = true;
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    data.MQTTVersion            = 4;
    data.struct_version         = 0;
    data.clientID.cstring       = clientId; 
    data.keepAliveInterval      = MQTT_KEEPALIVE;  // in Sec   
    data.username.cstring       = AUTH_METHOD;
    data.password.cstring       = auth_token;
    printf ("AutToken: %s\n\r", auth_token);
       
    if ((rc = client->connect(data)) != 0) {
        printf("rc from MQTT connect is %d\r\n", rc);
        connack_rc = rc;
        return rc;
    }
    connected = true;
    printf ("--->MQTT Connected\n\r"); 
    if ((rc = MQTT_Subscribe(MQTT_COMMAND_TOPIC, client, uploadStruct)) == 0) { 
        LOG ("--->>>MQTT subscribed to: %s\n\r", MQTT_COMMAND_TOPIC);
    } else {
        LOG ("--->>>ERROR MQTT subscribe : %s\n\r", MQTT_COMMAND_TOPIC);
    }  
    mqttConnecting = false;
    connack_rc = rc;
    return rc;       
}


int MQTT_GetConnTimeout(int attemptNumber) {  // First 10 attempts try within 3 seconds, next 10 attempts retry after every 1 minute
   // after 20 attempts, retry every 10 minutes
    return (attemptNumber < 10) ? 3 : (attemptNumber < 20) ? 60 : 600;
}


void MQTT_AttemptConnect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct) {
    connected = false;
           
    while (MQTT_Connect(client, mqttNetwork, network, uploadStruct) != MQTT_CONNECTION_ACCEPTED) {    
        if (connack_rc == MQTT_NOT_AUTHORIZED || connack_rc == MQTT_BAD_USERNAME_OR_PASSWORD) {
            printf ("File: %s, Line: %d Error: %d\n\r",__FILE__,__LINE__, connack_rc);        
            return; // don't reattempt to connect if credentials are wrong
        } 
        int timeout = MQTT_GetConnTimeout(++retryAttempt);
        WARN("Retry attempt number %d waiting %d\n", retryAttempt, timeout);
        
        // if ipstack and client were on the heap we could deconstruct and goto a label where they are constructed
        //  or maybe just add the proper members to do this disconnect and call MQTT_AttemptConnect(...)        
        // this works - reset the system when the retry count gets to a threshold
        if (retryAttempt == 5)
        {
            /* RESET ESP */
            NVIC_SystemReset();
        } 
        else
            wait(timeout);
    }    
}

int MQTT_PublishADC(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float adcVal_0) {
    MQTT::Message message;
    const char* pubTopic = MQTT_EVENT_TOPIC;
            
    char buf[MQTT_MAX_PAYLOAD_SIZE];
    char timeBuf[50];

    if (!client->isConnected()) { 
        printf ("---> MQTT DISCONNECTED\n\r"); return MQTT::FAILURE; 
    }
    
    strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime));
    sprintf(buf, "{\"type\":1,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"adc0\":%.1f}",
                timeBuf, commandID, adcVal_0);
    message.qos        = MQTT::QOS0;
    message.retained   = false;
    message.dup        = false;
    message.payload    = (void*)buf;
    message.payloadlen = strlen(buf);

    if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE)
        printf("message too long!\r\n");
    
    LOG("Publishing %s\n\r", buf);
    return client->publish(pubTopic, message);    
}

int MQTT_PublishSensorVal(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float TempVal, float DOVal) {
    MQTT::Message message;
    const char* pubTopic = MQTT_EVENT_TOPIC;
            
    char buf[MQTT_MAX_PAYLOAD_SIZE];
//    char timeBuf[50];

    if (!client->isConnected()) { 
        printf ("---> MQTT DISCONNECTED\n\r"); return MQTT::FAILURE; 
    }
    
//    strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime));
    sprintf(buf, "{\"type\":2,\"deviceId\":\"PROEVN\",\"time\":%d,\"cmdId\":%d,\"temperature0\":%.2f,\"oxygen0\":%.2f}",
                inputTime, commandID, TempVal ,DOVal);
    message.qos        = MQTT::QOS0;
    message.retained   = false;
    message.dup        = false;
    message.payload    = (void*)buf;
    message.payloadlen = strlen(buf);

    if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE)
        printf("message too long!\r\n");
    
    LOG("Publishing %s\n\r", buf);
    return client->publish(pubTopic, message);  	
}

int MQTT_PublishRelayState(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, int relay1, int relay2, int relay3) {
    MQTT::Message message;
    const char* pubTopic = MQTT_EVENT_TOPIC;         
    char buf[MQTT_MAX_PAYLOAD_SIZE];
//    char timeBuf[50];
    
    if (!client->isConnected()) { 
        printf ("---> MQTT DISCONNECTED\n\r"); 
        return MQTT::FAILURE; 
    }
//    strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime));
    sprintf(buf, "{\"type\":3,\"deviceId\":\"PROEVN\",\"time\":%d,\"cmdId\":%d,\"relay1\":%d,\"relay2\":%d,\"relay3\":%d}",
                inputTime, commandID, relay1, relay2, relay3);
    message.qos        = MQTT::QOS0;
    message.retained   = false;
    message.dup        = false;
    message.payload    = (void*)buf;
    message.payloadlen = strlen(buf);

    if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE)
        printf("message too long!\r\n");
    
    LOG("Publishing %s\n\r", buf);
    return client->publish(pubTopic, message);       
}

int MQTT_PublishConfigValue(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t mode, 
                                                    uint8_t OxyThreshold, uint8_t TemperatureThreshold, uint16_t UploadInterval) {
    MQTT::Message message;
    const char* pubTopic = MQTT_EVENT_TOPIC;         
    char buf[MQTT_MAX_PAYLOAD_SIZE];
//    char timeBuf[50];
    
    if (!client->isConnected()) { 
        printf ("---> MQTT DISCONNECTED\n\r"); 
        return MQTT::FAILURE; 
    }
//    strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime));
    sprintf(buf, "{\"type\":4,\"deviceId\":\"PROEVN\",\"time\":%d,\"cmdId\":%d,\"mode\":%d,\"OxygenThreshold\":%d,\"TemperatureThreshold\":%d,\"uploadInterval\":%d}",
                inputTime, commandID, mode, OxyThreshold, TemperatureThreshold, UploadInterval);
    message.qos        = MQTT::QOS0;
    message.retained   = false;
    message.dup        = false;
    message.payload    = (void*)buf;
    message.payloadlen = strlen(buf);

    if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE)
        printf("message too long!\r\n");
    
    LOG("Publishing %s\n\r", buf);
    return client->publish(pubTopic, message);       	
}

int MQTT_PublishAll(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t uploadType, struct UploadValue uploadStruct) {
	int retVal;
    switch (uploadType) {
        case (ADC_VALUE): 		retVal = MQTT_PublishADC(client, inputTime, uploadStruct.ADC_DOVal);
            break;
        case (SENSOR_VALUE): 	retVal =  MQTT_PublishSensorVal(client, inputTime, uploadStruct.SENSOR_TEMPVal ,uploadStruct.SENSOR_DOVal);
            break;
        case (RELAY_STATE):		retVal = MQTT_PublishRelayState(client, inputTime, uploadStruct.RELAY_State_1, uploadStruct.RELAY_State_2, uploadStruct.RELAY_State_3);
            break;
        case (CONFIG_VALUE): 	retVal = MQTT_PublishConfigValue(client, inputTime, uploadStruct.CONFIG_Mode, uploadStruct.CONFIG_OxyThreshold, uploadStruct.CONFIG_TemperatureThreshold, uploadStruct.CONFIG_UploadInterval);
        	break;
        default: break;
    }
	return retVal;
}

int MQTT_PublishDeviceManage(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint32_t uploadPeriod, uint32_t sendTimePeriod) {
    MQTT::Message message;
    const char* pubTopic = MQTT_MANAGE_TOPIC;         
    char buf[MQTT_MAX_PAYLOAD_SIZE];
    char timeBuf[50];
    
    if (!client->isConnected()) { 
        printf ("---> MQTT DISCONNECTED\n\r"); 
        return MQTT::FAILURE; 
    }
    strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime));
//    sprintf(buf, "{\"d\":{\"metadata\":{\"Device ID\":\"PROEVN\",\"Time\":%s,\"Upload Interval\":%d,\"Send Time Frame Interval\":%d},\"supports\":{\"deviceActions\":true}}}",
//                timeBuf, uploadPeriod, sendTimePeriod);
    sprintf(buf, "{\"d\":{\"metadata\":{\"Upload Interval\":%d},\"supports\":{\"deviceActions\":true}}}",
                uploadPeriod);
    message.qos        = MQTT::QOS0;
    message.retained   = false;
    message.dup        = false;
    message.payload    = (void*)buf;
    message.payloadlen = strlen(buf);

    if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE)
        printf("message too long!\r\n");
    
    LOG("Publishing %s\n\r", buf);
    return client->publish(pubTopic, message);     	
}

#endif /* __SIMPLEMQTT_H__ */