Demo application for using the AT&T IoT Starter Kit Powered by AWS.
Dependencies: SDFileSystem
Fork of ATT_AWS_IoT_demo by
aws_iot_shadow_records.cpp
00001 /* 00002 * Copyright 2010-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"). 00005 * You may not use this file except in compliance with the License. 00006 * A copy of the License is located at 00007 * 00008 * http://aws.amazon.com/apache2.0 00009 * 00010 * or in the "license" file accompanying this file. This file is distributed 00011 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 00012 * express or implied. See the License for the specific language governing 00013 * permissions and limitations under the License. 00014 */ 00015 00016 #include "aws_iot_shadow_records.h" 00017 00018 #include <string.h> 00019 #include <stdio.h> 00020 00021 #include "timer_interface.h" 00022 #include "aws_iot_json_utils.h" 00023 #include "aws_iot_log.h" 00024 #include "aws_iot_shadow_json.h" 00025 #include "aws_iot_config.h" 00026 00027 typedef struct { 00028 char clientTokenID[MAX_SIZE_CLIENT_ID_WITH_SEQUENCE]; 00029 char thingName[MAX_SIZE_OF_THING_NAME]; 00030 ShadowActions_t action; 00031 fpActionCallback_t callback; 00032 void *pCallbackContext; 00033 bool isFree; 00034 Timer timer; 00035 } ToBeReceivedAckRecord_t; 00036 00037 typedef struct { 00038 const char *pKey; 00039 void *pStruct; 00040 jsonStructCallback_t callback; 00041 bool isFree; 00042 } JsonTokenTable_t; 00043 00044 typedef struct { 00045 char Topic[MAX_SHADOW_TOPIC_LENGTH_BYTES]; 00046 uint8_t count; 00047 bool isFree; 00048 bool isSticky; 00049 } SubscriptionRecord_t; 00050 00051 typedef enum { 00052 SHADOW_ACCEPTED, SHADOW_REJECTED, SHADOW_ACTION 00053 } ShadowAckTopicTypes_t; 00054 00055 ToBeReceivedAckRecord_t AckWaitList[MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME]; 00056 00057 MQTTClient_t *pMqttClient; 00058 00059 char myThingName[MAX_SIZE_OF_THING_NAME]; 00060 char mqttClientID[MAX_SIZE_OF_UNIQUE_CLIENT_ID_BYTES]; 00061 00062 char shadowDeltaTopic[MAX_SHADOW_TOPIC_LENGTH_BYTES]; 00063 00064 #define MAX_TOPICS_AT_ANY_GIVEN_TIME 2*MAX_THINGNAME_HANDLED_AT_ANY_GIVEN_TIME 00065 SubscriptionRecord_t SubscriptionList[MAX_TOPICS_AT_ANY_GIVEN_TIME]; 00066 00067 #define SUBSCRIBE_SETTLING_TIME 2 00068 char shadowRxBuf[SHADOW_MAX_SIZE_OF_RX_BUFFER]; 00069 00070 static JsonTokenTable_t tokenTable[MAX_JSON_TOKEN_EXPECTED]; 00071 static uint32_t tokenTableIndex = 0; 00072 static bool deltaTopicSubscribedFlag = false; 00073 uint32_t shadowJsonVersionNum = 0; 00074 bool shadowDiscardOldDeltaFlag = true; 00075 00076 // local helper functions 00077 static int AckStatusCallback(MQTTCallbackParams params); 00078 static int shadow_delta_callback(MQTTCallbackParams params); 00079 static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action, 00080 ShadowAckTopicTypes_t ackType); 00081 static int16_t getNextFreeIndexOfSubscriptionList(void); 00082 static void unsubscribeFromAcceptedAndRejected(uint8_t index); 00083 00084 void initDeltaTokens(void) { 00085 uint32_t i; 00086 for (i = 0; i < MAX_JSON_TOKEN_EXPECTED; i++) { 00087 tokenTable[i].isFree = true; 00088 } 00089 tokenTableIndex = 0; 00090 deltaTopicSubscribedFlag = false; 00091 } 00092 00093 IoT_Error_t registerJsonTokenOnDelta(jsonStruct_t *pStruct) { 00094 00095 IoT_Error_t rc = NONE_ERROR; 00096 00097 if (!deltaTopicSubscribedFlag) { 00098 MQTTSubscribeParams subParams; 00099 subParams.mHandler = shadow_delta_callback; 00100 snprintf(shadowDeltaTopic,MAX_SHADOW_TOPIC_LENGTH_BYTES, "$aws/things/%s/shadow/update/delta", myThingName); 00101 subParams.pTopic = shadowDeltaTopic; 00102 subParams.qos = QOS_0; 00103 rc = pMqttClient->subscribe(&subParams); 00104 DEBUG("delta topic %s", shadowDeltaTopic); 00105 deltaTopicSubscribedFlag = true; 00106 } 00107 00108 if (tokenTableIndex >= MAX_JSON_TOKEN_EXPECTED) { 00109 return GENERIC_ERROR; 00110 } 00111 00112 tokenTable[tokenTableIndex].pKey = pStruct->pKey; 00113 tokenTable[tokenTableIndex].callback = pStruct->cb; 00114 tokenTable[tokenTableIndex].pStruct = pStruct; 00115 tokenTable[tokenTableIndex].isFree = false; 00116 tokenTableIndex++; 00117 00118 return rc; 00119 } 00120 00121 static int16_t getNextFreeIndexOfSubscriptionList(void) { 00122 uint8_t i; 00123 for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { 00124 if (SubscriptionList[i].isFree) { 00125 SubscriptionList[i].isFree = false; 00126 return i; 00127 } 00128 } 00129 return -1; 00130 } 00131 00132 static void topicNameFromThingAndAction(char *pTopic, const char *pThingName, ShadowActions_t action, 00133 ShadowAckTopicTypes_t ackType) { 00134 00135 char actionBuf[10]; 00136 char ackTypeBuf[10]; 00137 00138 if (action == SHADOW_GET) { 00139 strcpy(actionBuf, "get"); 00140 } else if (action == SHADOW_UPDATE) { 00141 strcpy(actionBuf, "update"); 00142 } else if (action == SHADOW_DELETE) { 00143 strcpy(actionBuf, "delete"); 00144 } 00145 00146 if (ackType == SHADOW_ACCEPTED) { 00147 strcpy(ackTypeBuf, "accepted"); 00148 } else if (ackType == SHADOW_REJECTED) { 00149 strcpy(ackTypeBuf, "rejected"); 00150 } 00151 00152 if (ackType == SHADOW_ACTION) { 00153 sprintf(pTopic, "$aws/things/%s/shadow/%s", pThingName, actionBuf); 00154 } else { 00155 sprintf(pTopic, "$aws/things/%s/shadow/%s/%s", pThingName, actionBuf, ackTypeBuf); 00156 } 00157 } 00158 00159 static bool isAckForMyThingName(const char *pTopicName) { 00160 if (strstr(pTopicName, myThingName) != NULL && ((strstr(pTopicName, "get/accepted") != NULL) || (strstr(pTopicName, "delta") != NULL))) { 00161 return true; 00162 } 00163 return false; 00164 } 00165 00166 static int AckStatusCallback(MQTTCallbackParams params) { 00167 int32_t tokenCount; 00168 int32_t i; 00169 void *pJsonHandler; 00170 char temporaryClientToken[MAX_SIZE_CLIENT_ID_WITH_SEQUENCE]; 00171 00172 if (params.MessageParams.PayloadLen > SHADOW_MAX_SIZE_OF_RX_BUFFER) { 00173 return GENERIC_ERROR; 00174 } 00175 00176 memcpy(shadowRxBuf, params.MessageParams.pPayload, params.MessageParams.PayloadLen); 00177 shadowRxBuf[params.MessageParams.PayloadLen] = '\0'; // jsmn_parse relies on a string 00178 00179 if (!isJsonValidAndParse(shadowRxBuf, pJsonHandler, &tokenCount)) { 00180 WARN("Received JSON is not valid"); 00181 return GENERIC_ERROR; 00182 } 00183 00184 if (isAckForMyThingName(params.pTopicName)) { 00185 uint32_t tempVersionNumber = 0; 00186 if (extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) { 00187 if (tempVersionNumber > shadowJsonVersionNum) { 00188 shadowJsonVersionNum = tempVersionNumber; 00189 } 00190 } 00191 } 00192 00193 if (extractClientToken(shadowRxBuf, temporaryClientToken)) { 00194 for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) { 00195 if (!AckWaitList[i].isFree) { 00196 if (strcmp(AckWaitList[i].clientTokenID, temporaryClientToken) == 0) { 00197 Shadow_Ack_Status_t status; 00198 if (strstr(params.pTopicName, "accepted") != NULL) { 00199 status = SHADOW_ACK_ACCEPTED; 00200 } else if (strstr(params.pTopicName, "rejected") != NULL) { 00201 status = SHADOW_ACK_REJECTED; 00202 } 00203 if (status == SHADOW_ACK_ACCEPTED || status == SHADOW_ACK_REJECTED) { 00204 if (AckWaitList[i].callback != NULL) { 00205 AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, status, 00206 shadowRxBuf, AckWaitList[i].pCallbackContext); 00207 } 00208 unsubscribeFromAcceptedAndRejected(i); 00209 AckWaitList[i].isFree = true; 00210 return NONE_ERROR; 00211 } 00212 } 00213 } 00214 } 00215 } 00216 00217 return GENERIC_ERROR; 00218 } 00219 00220 static int16_t findIndexOfSubscriptionList(const char *pTopic) { 00221 uint8_t i; 00222 for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { 00223 if (!SubscriptionList[i].isFree) { 00224 if ((strcmp(pTopic, SubscriptionList[i].Topic) == 0)) { 00225 return i; 00226 } 00227 } 00228 } 00229 return -1; 00230 } 00231 00232 static void unsubscribeFromAcceptedAndRejected(uint8_t index) { 00233 00234 char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES]; 00235 char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES]; 00236 IoT_Error_t ret_val = NONE_ERROR; 00237 00238 topicNameFromThingAndAction(TemporaryTopicNameAccepted, AckWaitList[index].thingName, AckWaitList[index].action, 00239 SHADOW_ACCEPTED); 00240 topicNameFromThingAndAction(TemporaryTopicNameRejected, AckWaitList[index].thingName, AckWaitList[index].action, 00241 SHADOW_REJECTED); 00242 00243 int16_t indexSubList; 00244 00245 indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameAccepted); 00246 if ((indexSubList >= 0)) { 00247 if (!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) { 00248 ret_val = pMqttClient->unsubscribe(TemporaryTopicNameAccepted); 00249 if (ret_val == NONE_ERROR) { 00250 SubscriptionList[indexSubList].isFree = true; 00251 } 00252 } else if (SubscriptionList[indexSubList].count > 1) { 00253 SubscriptionList[indexSubList].count--; 00254 } 00255 } 00256 00257 indexSubList = findIndexOfSubscriptionList(TemporaryTopicNameRejected); 00258 if ((indexSubList >= 0)) { 00259 if (!SubscriptionList[indexSubList].isSticky && (SubscriptionList[indexSubList].count == 1)) { 00260 ret_val = pMqttClient->unsubscribe(TemporaryTopicNameRejected); 00261 if (ret_val == NONE_ERROR) { 00262 SubscriptionList[indexSubList].isFree = true; 00263 } 00264 } else if (SubscriptionList[indexSubList].count > 1) { 00265 SubscriptionList[indexSubList].count--; 00266 } 00267 } 00268 } 00269 00270 void initializeRecords(MQTTClient_t *pClient) { 00271 uint8_t i; 00272 for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) { 00273 AckWaitList[i].isFree = true; 00274 } 00275 for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { 00276 SubscriptionList[i].isFree = true; 00277 SubscriptionList[i].count = 0; 00278 SubscriptionList[i].isSticky = false; 00279 } 00280 pMqttClient = pClient; 00281 } 00282 00283 bool isSubscriptionPresent(const char *pThingName, ShadowActions_t action) { 00284 00285 uint8_t i = 0; 00286 bool isAcceptedPresent = false; 00287 bool isRejectedPresent = false; 00288 char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES]; 00289 char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES]; 00290 00291 topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED); 00292 topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED); 00293 00294 for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { 00295 if (!SubscriptionList[i].isFree) { 00296 if ((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0)) { 00297 isAcceptedPresent = true; 00298 } else if ((strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) { 00299 isRejectedPresent = true; 00300 } 00301 } 00302 } 00303 00304 if (isRejectedPresent && isAcceptedPresent) { 00305 return true; 00306 } 00307 00308 return false; 00309 } 00310 00311 IoT_Error_t subscribeToShadowActionAcks(const char *pThingName, ShadowActions_t action, bool isSticky) { 00312 IoT_Error_t ret_val = NONE_ERROR; 00313 MQTTSubscribeParams subParams = MQTTSubscribeParamsDefault; 00314 00315 bool clearBothEntriesFromList = true; 00316 int16_t indexAcceptedSubList = 0; 00317 int16_t indexRejectedSubList = 0; 00318 indexAcceptedSubList = getNextFreeIndexOfSubscriptionList(); 00319 indexRejectedSubList = getNextFreeIndexOfSubscriptionList(); 00320 00321 if (indexAcceptedSubList >= 0 && indexRejectedSubList >= 0) { 00322 topicNameFromThingAndAction(SubscriptionList[indexAcceptedSubList].Topic, pThingName, action, SHADOW_ACCEPTED); 00323 subParams.mHandler = AckStatusCallback; 00324 subParams.qos = QOS_0; 00325 subParams.pTopic = SubscriptionList[indexAcceptedSubList].Topic; 00326 ret_val = pMqttClient->subscribe(&subParams); 00327 if (ret_val == NONE_ERROR) { 00328 SubscriptionList[indexAcceptedSubList].count = 1; 00329 SubscriptionList[indexAcceptedSubList].isSticky = isSticky; 00330 topicNameFromThingAndAction(SubscriptionList[indexRejectedSubList].Topic, pThingName, action, 00331 SHADOW_REJECTED); 00332 subParams.pTopic = SubscriptionList[indexRejectedSubList].Topic; 00333 ret_val = pMqttClient->subscribe(&subParams); 00334 if (ret_val == NONE_ERROR) { 00335 SubscriptionList[indexRejectedSubList].count = 1; 00336 SubscriptionList[indexRejectedSubList].isSticky = isSticky; 00337 clearBothEntriesFromList = false; 00338 00339 // wait for SUBSCRIBE_SETTLING_TIME seconds to let the subscription take effect 00340 Timer subSettlingtimer; 00341 InitTimer(&subSettlingtimer); 00342 countdown(&subSettlingtimer, SUBSCRIBE_SETTLING_TIME); 00343 while(!expired(&subSettlingtimer)); 00344 00345 } 00346 } 00347 } 00348 00349 if (clearBothEntriesFromList) { 00350 if (indexAcceptedSubList >= 0) { 00351 SubscriptionList[indexAcceptedSubList].isFree = true; 00352 } else if (indexRejectedSubList >= 0) { 00353 SubscriptionList[indexRejectedSubList].isFree = true; 00354 } 00355 if (SubscriptionList[indexAcceptedSubList].count == 1) { 00356 pMqttClient->unsubscribe(SubscriptionList[indexAcceptedSubList].Topic); 00357 } 00358 } 00359 00360 return ret_val; 00361 } 00362 00363 void incrementSubscriptionCnt(const char *pThingName, ShadowActions_t action, bool isSticky) { 00364 char TemporaryTopicNameAccepted[MAX_SHADOW_TOPIC_LENGTH_BYTES]; 00365 char TemporaryTopicNameRejected[MAX_SHADOW_TOPIC_LENGTH_BYTES]; 00366 uint8_t i; 00367 topicNameFromThingAndAction(TemporaryTopicNameAccepted, pThingName, action, SHADOW_ACCEPTED); 00368 topicNameFromThingAndAction(TemporaryTopicNameRejected, pThingName, action, SHADOW_REJECTED); 00369 00370 for (i = 0; i < MAX_TOPICS_AT_ANY_GIVEN_TIME; i++) { 00371 if (!SubscriptionList[i].isFree) { 00372 if ((strcmp(TemporaryTopicNameAccepted, SubscriptionList[i].Topic) == 0) 00373 || (strcmp(TemporaryTopicNameRejected, SubscriptionList[i].Topic) == 0)) { 00374 SubscriptionList[i].count++; 00375 SubscriptionList[i].isSticky = isSticky; 00376 } 00377 } 00378 } 00379 } 00380 00381 IoT_Error_t publishToShadowAction(const char * pThingName, ShadowActions_t action, const char *pJsonDocumentToBeSent) { 00382 IoT_Error_t ret_val = NONE_ERROR; 00383 char TemporaryTopicName[MAX_SHADOW_TOPIC_LENGTH_BYTES]; 00384 topicNameFromThingAndAction(TemporaryTopicName, pThingName, action, SHADOW_ACTION); 00385 00386 MQTTPublishParams pubParams = MQTTPublishParamsDefault; 00387 pubParams.pTopic = TemporaryTopicName; 00388 MQTTMessageParams msgParams = MQTTMessageParamsDefault; 00389 msgParams.qos = QOS_0; 00390 msgParams.PayloadLen = strlen(pJsonDocumentToBeSent) + 1; 00391 msgParams.pPayload = (char *) pJsonDocumentToBeSent; 00392 pubParams.MessageParams = msgParams; 00393 ret_val = pMqttClient->publish(&pubParams); 00394 00395 return ret_val; 00396 } 00397 00398 bool getNextFreeIndexOfAckWaitList(uint8_t *pIndex) { 00399 uint8_t i; 00400 if (pIndex != NULL) { 00401 for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) { 00402 if (AckWaitList[i].isFree) { 00403 *pIndex = i; 00404 return true; 00405 } 00406 } 00407 } 00408 return false; 00409 } 00410 00411 void addToAckWaitList(uint8_t indexAckWaitList, const char *pThingName, ShadowActions_t action, 00412 const char *pExtractedClientToken, fpActionCallback_t callback, void *pCallbackContext, 00413 uint32_t timeout_seconds) { 00414 AckWaitList[indexAckWaitList].callback = callback; 00415 strncpy(AckWaitList[indexAckWaitList].clientTokenID, pExtractedClientToken, MAX_SIZE_CLIENT_ID_WITH_SEQUENCE); 00416 strncpy(AckWaitList[indexAckWaitList].thingName, pThingName, MAX_SIZE_OF_THING_NAME); 00417 AckWaitList[indexAckWaitList].pCallbackContext = pCallbackContext; 00418 AckWaitList[indexAckWaitList].action = action; 00419 InitTimer(&(AckWaitList[indexAckWaitList].timer)); 00420 countdown(&(AckWaitList[indexAckWaitList].timer), timeout_seconds); 00421 AckWaitList[indexAckWaitList].isFree = false; 00422 } 00423 00424 void HandleExpiredResponseCallbacks(void) { 00425 uint8_t i; 00426 for (i = 0; i < MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME; i++) { 00427 if (!AckWaitList[i].isFree) { 00428 if (expired(&(AckWaitList[i].timer))) { 00429 if (AckWaitList[i].callback != NULL) { 00430 AckWaitList[i].callback(AckWaitList[i].thingName, AckWaitList[i].action, SHADOW_ACK_TIMEOUT, 00431 shadowRxBuf, AckWaitList[i].pCallbackContext); 00432 } 00433 AckWaitList[i].isFree = true; 00434 unsubscribeFromAcceptedAndRejected(i); 00435 } 00436 } 00437 } 00438 } 00439 00440 static int shadow_delta_callback(MQTTCallbackParams params) { 00441 00442 int32_t tokenCount; 00443 uint32_t i = 0; 00444 void *pJsonHandler; 00445 int32_t DataPosition; 00446 uint32_t dataLength; 00447 00448 if (params.MessageParams.PayloadLen > SHADOW_MAX_SIZE_OF_RX_BUFFER) { 00449 return GENERIC_ERROR; 00450 } 00451 00452 memcpy(shadowRxBuf, params.MessageParams.pPayload, params.MessageParams.PayloadLen); 00453 shadowRxBuf[params.MessageParams.PayloadLen] = '\0'; // jsmn_parse relies on a string 00454 00455 if (!isJsonValidAndParse(shadowRxBuf, pJsonHandler, &tokenCount)) { 00456 WARN("Received JSON is not valid"); 00457 return GENERIC_ERROR; 00458 } 00459 00460 if (shadowDiscardOldDeltaFlag) { 00461 uint32_t tempVersionNumber = 0; 00462 if (extractVersionNumber(shadowRxBuf, pJsonHandler, tokenCount, &tempVersionNumber)) { 00463 if (tempVersionNumber > shadowJsonVersionNum) { 00464 shadowJsonVersionNum = tempVersionNumber; 00465 DEBUG("New Version number: %d", shadowJsonVersionNum); 00466 } else { 00467 WARN("Old Delta Message received - Ignoring rx: %d local: %d", tempVersionNumber, shadowJsonVersionNum); 00468 return GENERIC_ERROR; 00469 } 00470 } 00471 } 00472 00473 for (i = 0; i < tokenTableIndex; i++) { 00474 if (!tokenTable[i].isFree) { 00475 if (isJsonKeyMatchingAndUpdateValue(shadowRxBuf, pJsonHandler, tokenCount, (jsonStruct_t *)tokenTable[i].pStruct, 00476 &dataLength, &DataPosition)) { 00477 if (tokenTable[i].callback != NULL) { 00478 tokenTable[i].callback(shadowRxBuf + DataPosition, dataLength, (jsonStruct_t *)tokenTable[i].pStruct); 00479 } 00480 } 00481 } 00482 } 00483 00484 return NONE_ERROR; 00485 } 00486
Generated on Tue Jul 12 2022 22:13:20 by 1.7.2