Added function to return network interface.

Dependencies:   MQTTPacket FP

Dependents:   IBMIoTF-2

Committer:
lamell
Date:
Sat Mar 21 16:17:57 2020 -0400
Revision:
65:ff8eaf67f510
Parent:
61:feeb4b0a918f
File MQTTClient.h:
Changed MAX_MQTT_PACKET_SIZE = 2048, original size is limited to 100 bytes.
If the message is larher than this limit, then no message will be identified in the server.

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 20:cad3d54d7ecf 1 /*******************************************************************************
icraggs 20:cad3d54d7ecf 2 * Copyright (c) 2014 IBM Corp.
icraggs 20:cad3d54d7ecf 3 *
icraggs 20:cad3d54d7ecf 4 * All rights reserved. This program and the accompanying materials
icraggs 20:cad3d54d7ecf 5 * are made available under the terms of the Eclipse Public License v1.0
icraggs 20:cad3d54d7ecf 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
icraggs 20:cad3d54d7ecf 7 *
icraggs 20:cad3d54d7ecf 8 * The Eclipse Public License is available at
icraggs 20:cad3d54d7ecf 9 * http://www.eclipse.org/legal/epl-v10.html
icraggs 20:cad3d54d7ecf 10 * and the Eclipse Distribution License is available at
icraggs 20:cad3d54d7ecf 11 * http://www.eclipse.org/org/documents/edl-v10.php.
icraggs 20:cad3d54d7ecf 12 *
icraggs 20:cad3d54d7ecf 13 * Contributors:
icraggs 20:cad3d54d7ecf 14 * Ian Craggs - initial API and implementation and/or initial documentation
icraggs 20:cad3d54d7ecf 15 *******************************************************************************/
icraggs 20:cad3d54d7ecf 16
icraggs 21:e918525e529d 17 #if !defined(MQTTASYNC_H)
icraggs 21:e918525e529d 18 #define MQTTASYNC_H
icraggs 20:cad3d54d7ecf 19
icraggs 20:cad3d54d7ecf 20 #include "FP.h"
icraggs 20:cad3d54d7ecf 21 #include "MQTTPacket.h"
icraggs 20:cad3d54d7ecf 22 #include "stdio.h"
icraggs 20:cad3d54d7ecf 23
icraggs 20:cad3d54d7ecf 24 namespace MQTT
icraggs 20:cad3d54d7ecf 25 {
icraggs 20:cad3d54d7ecf 26
icraggs 20:cad3d54d7ecf 27
icraggs 20:cad3d54d7ecf 28 enum QoS { QOS0, QOS1, QOS2 };
icraggs 20:cad3d54d7ecf 29
icraggs 20:cad3d54d7ecf 30
icraggs 20:cad3d54d7ecf 31 struct Message
icraggs 20:cad3d54d7ecf 32 {
icraggs 20:cad3d54d7ecf 33 enum QoS qos;
icraggs 20:cad3d54d7ecf 34 bool retained;
icraggs 20:cad3d54d7ecf 35 bool dup;
icraggs 20:cad3d54d7ecf 36 unsigned short id;
icraggs 20:cad3d54d7ecf 37 void *payload;
icraggs 20:cad3d54d7ecf 38 size_t payloadlen;
icraggs 20:cad3d54d7ecf 39 };
icraggs 20:cad3d54d7ecf 40
icraggs 20:cad3d54d7ecf 41
icraggs 20:cad3d54d7ecf 42 class PacketId
icraggs 20:cad3d54d7ecf 43 {
icraggs 20:cad3d54d7ecf 44 public:
icraggs 20:cad3d54d7ecf 45 PacketId();
icraggs 20:cad3d54d7ecf 46
icraggs 20:cad3d54d7ecf 47 int getNext();
icraggs 20:cad3d54d7ecf 48
icraggs 20:cad3d54d7ecf 49 private:
icraggs 20:cad3d54d7ecf 50 static const int MAX_PACKET_ID = 65535;
icraggs 20:cad3d54d7ecf 51 int next;
icraggs 20:cad3d54d7ecf 52 };
icraggs 20:cad3d54d7ecf 53
icraggs 20:cad3d54d7ecf 54 typedef void (*messageHandler)(Message*);
icraggs 20:cad3d54d7ecf 55
icraggs 20:cad3d54d7ecf 56 typedef struct limits
icraggs 20:cad3d54d7ecf 57 {
icraggs 20:cad3d54d7ecf 58 int MAX_MQTT_PACKET_SIZE; //
icraggs 20:cad3d54d7ecf 59 int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler
icraggs 20:cad3d54d7ecf 60 int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
icraggs 21:e918525e529d 61 int command_timeout_ms;
icraggs 20:cad3d54d7ecf 62
icraggs 20:cad3d54d7ecf 63 limits()
icraggs 20:cad3d54d7ecf 64 {
lamell 61:feeb4b0a918f 65 MAX_MQTT_PACKET_SIZE = 1024;
icraggs 20:cad3d54d7ecf 66 MAX_MESSAGE_HANDLERS = 5;
icraggs 20:cad3d54d7ecf 67 MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode
icraggs 21:e918525e529d 68 command_timeout_ms = 30000;
icraggs 20:cad3d54d7ecf 69 }
icraggs 20:cad3d54d7ecf 70 } Limits;
icraggs 20:cad3d54d7ecf 71
icraggs 22:aadb79d29330 72
icraggs 22:aadb79d29330 73 /**
icraggs 22:aadb79d29330 74 * @class Async
icraggs 22:aadb79d29330 75 * @brief non-blocking, threaded MQTT client API
icraggs 22:aadb79d29330 76 * @param Network a network class which supports send, receive
icraggs 22:aadb79d29330 77 * @param Timer a timer class with the methods:
icraggs 22:aadb79d29330 78 */
icraggs 21:e918525e529d 79 template<class Network, class Timer, class Thread, class Mutex> class Async
icraggs 20:cad3d54d7ecf 80 {
icraggs 20:cad3d54d7ecf 81
icraggs 20:cad3d54d7ecf 82 public:
icraggs 20:cad3d54d7ecf 83
icraggs 20:cad3d54d7ecf 84 struct Result
icraggs 20:cad3d54d7ecf 85 {
icraggs 20:cad3d54d7ecf 86 /* success or failure result data */
icraggs 21:e918525e529d 87 Async<Network, Timer, Thread, Mutex>* client;
icraggs 21:e918525e529d 88 int rc;
icraggs 20:cad3d54d7ecf 89 };
icraggs 20:cad3d54d7ecf 90
icraggs 20:cad3d54d7ecf 91 typedef void (*resultHandler)(Result*);
icraggs 20:cad3d54d7ecf 92
icraggs 21:e918525e529d 93 Async(Network* network, const Limits limits = Limits());
icraggs 21:e918525e529d 94
icraggs 21:e918525e529d 95 typedef struct
icraggs 21:e918525e529d 96 {
icraggs 21:e918525e529d 97 Async* client;
icraggs 21:e918525e529d 98 Network* network;
icraggs 21:e918525e529d 99 } connectionLostInfo;
icraggs 21:e918525e529d 100
icraggs 21:e918525e529d 101 typedef int (*connectionLostHandlers)(connectionLostInfo*);
icraggs 21:e918525e529d 102
icraggs 21:e918525e529d 103 /** Set the connection lost callback - called whenever the connection is lost and we should be connected
icraggs 21:e918525e529d 104 * @param clh - pointer to the callback function
icraggs 21:e918525e529d 105 */
icraggs 21:e918525e529d 106 void setConnectionLostHandler(connectionLostHandlers clh)
icraggs 21:e918525e529d 107 {
icraggs 21:e918525e529d 108 connectionLostHandler.attach(clh);
icraggs 21:e918525e529d 109 }
icraggs 21:e918525e529d 110
icraggs 21:e918525e529d 111 /** Set the default message handling callback - used for any message which does not match a subscription message handler
icraggs 21:e918525e529d 112 * @param mh - pointer to the callback function
icraggs 21:e918525e529d 113 */
icraggs 21:e918525e529d 114 void setDefaultMessageHandler(messageHandler mh)
icraggs 21:e918525e529d 115 {
icraggs 21:e918525e529d 116 defaultMessageHandler.attach(mh);
icraggs 21:e918525e529d 117 }
icraggs 20:cad3d54d7ecf 118
icraggs 21:e918525e529d 119 int connect(resultHandler fn, MQTTPacket_connectData* options = 0);
icraggs 20:cad3d54d7ecf 120
icraggs 20:cad3d54d7ecf 121 template<class T>
icraggs 21:e918525e529d 122 int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0); // alternative to pass in pointer to member function
icraggs 20:cad3d54d7ecf 123
icraggs 21:e918525e529d 124 int publish(resultHandler rh, const char* topic, Message* message);
icraggs 20:cad3d54d7ecf 125
icraggs 21:e918525e529d 126 int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh);
icraggs 20:cad3d54d7ecf 127
icraggs 21:e918525e529d 128 int unsubscribe(resultHandler rh, const char* topicFilter);
icraggs 20:cad3d54d7ecf 129
icraggs 21:e918525e529d 130 int disconnect(resultHandler rh);
icraggs 20:cad3d54d7ecf 131
icraggs 20:cad3d54d7ecf 132 private:
icraggs 20:cad3d54d7ecf 133
icraggs 20:cad3d54d7ecf 134 void run(void const *argument);
icraggs 20:cad3d54d7ecf 135 int cycle(int timeout);
icraggs 20:cad3d54d7ecf 136 int waitfor(int packet_type, Timer& atimer);
icraggs 20:cad3d54d7ecf 137 int keepalive();
icraggs 20:cad3d54d7ecf 138 int findFreeOperation();
icraggs 20:cad3d54d7ecf 139
icraggs 20:cad3d54d7ecf 140 int decodePacket(int* value, int timeout);
icraggs 20:cad3d54d7ecf 141 int readPacket(int timeout);
icraggs 20:cad3d54d7ecf 142 int sendPacket(int length, int timeout);
icraggs 20:cad3d54d7ecf 143 int deliverMessage(MQTTString* topic, Message* message);
icraggs 20:cad3d54d7ecf 144
icraggs 20:cad3d54d7ecf 145 Thread* thread;
icraggs 20:cad3d54d7ecf 146 Network* ipstack;
icraggs 20:cad3d54d7ecf 147
icraggs 20:cad3d54d7ecf 148 Limits limits;
icraggs 20:cad3d54d7ecf 149
icraggs 20:cad3d54d7ecf 150 char* buf;
icraggs 20:cad3d54d7ecf 151 char* readbuf;
icraggs 20:cad3d54d7ecf 152
icraggs 20:cad3d54d7ecf 153 Timer ping_timer, connect_timer;
icraggs 20:cad3d54d7ecf 154 unsigned int keepAliveInterval;
icraggs 20:cad3d54d7ecf 155 bool ping_outstanding;
icraggs 20:cad3d54d7ecf 156
icraggs 20:cad3d54d7ecf 157 PacketId packetid;
icraggs 20:cad3d54d7ecf 158
icraggs 20:cad3d54d7ecf 159 typedef FP<void, Result*> resultHandlerFP;
icraggs 20:cad3d54d7ecf 160 resultHandlerFP connectHandler;
icraggs 20:cad3d54d7ecf 161
icraggs 20:cad3d54d7ecf 162 typedef FP<void, Message*> messageHandlerFP;
icraggs 20:cad3d54d7ecf 163 struct MessageHandlers
icraggs 20:cad3d54d7ecf 164 {
icraggs 20:cad3d54d7ecf 165 const char* topic;
icraggs 20:cad3d54d7ecf 166 messageHandlerFP fp;
icraggs 20:cad3d54d7ecf 167 } *messageHandlers; // Message handlers are indexed by subscription topic
icraggs 20:cad3d54d7ecf 168
icraggs 20:cad3d54d7ecf 169 // how many concurrent operations should we allow? Each one will require a function pointer
icraggs 20:cad3d54d7ecf 170 struct Operations
icraggs 20:cad3d54d7ecf 171 {
icraggs 20:cad3d54d7ecf 172 unsigned short id;
icraggs 20:cad3d54d7ecf 173 resultHandlerFP fp;
icraggs 20:cad3d54d7ecf 174 const char* topic; // if this is a publish, store topic name in case republishing is required
icraggs 20:cad3d54d7ecf 175 Message* message; // for publish,
icraggs 20:cad3d54d7ecf 176 Timer timer; // to check if the command has timed out
icraggs 20:cad3d54d7ecf 177 } *operations; // result handlers are indexed by packet ids
icraggs 20:cad3d54d7ecf 178
icraggs 20:cad3d54d7ecf 179 static void threadfn(void* arg);
icraggs 21:e918525e529d 180
icraggs 21:e918525e529d 181 messageHandlerFP defaultMessageHandler;
icraggs 21:e918525e529d 182
icraggs 21:e918525e529d 183 typedef FP<int, connectionLostInfo*> connectionLostFP;
icraggs 21:e918525e529d 184
icraggs 21:e918525e529d 185 connectionLostFP connectionLostHandler;
icraggs 20:cad3d54d7ecf 186
icraggs 20:cad3d54d7ecf 187 };
icraggs 20:cad3d54d7ecf 188
icraggs 20:cad3d54d7ecf 189 }
icraggs 20:cad3d54d7ecf 190
icraggs 20:cad3d54d7ecf 191
icraggs 21:e918525e529d 192 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg)
icraggs 20:cad3d54d7ecf 193 {
icraggs 21:e918525e529d 194 ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL);
icraggs 20:cad3d54d7ecf 195 }
icraggs 20:cad3d54d7ecf 196
icraggs 20:cad3d54d7ecf 197
icraggs 21:e918525e529d 198 template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits) : limits(limits), packetid()
icraggs 20:cad3d54d7ecf 199 {
icraggs 20:cad3d54d7ecf 200 this->thread = 0;
icraggs 20:cad3d54d7ecf 201 this->ipstack = network;
icraggs 20:cad3d54d7ecf 202 this->ping_timer = Timer();
icraggs 20:cad3d54d7ecf 203 this->ping_outstanding = 0;
icraggs 20:cad3d54d7ecf 204
icraggs 20:cad3d54d7ecf 205 // How to make these memory allocations portable? I was hoping to avoid the heap
icraggs 20:cad3d54d7ecf 206 buf = new char[limits.MAX_MQTT_PACKET_SIZE];
icraggs 20:cad3d54d7ecf 207 readbuf = new char[limits.MAX_MQTT_PACKET_SIZE];
icraggs 20:cad3d54d7ecf 208 this->operations = new struct Operations[limits.MAX_CONCURRENT_OPERATIONS];
icraggs 20:cad3d54d7ecf 209 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
icraggs 20:cad3d54d7ecf 210 operations[i].id = 0;
icraggs 20:cad3d54d7ecf 211 this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS];
icraggs 20:cad3d54d7ecf 212 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
icraggs 20:cad3d54d7ecf 213 messageHandlers[i].topic = 0;
icraggs 20:cad3d54d7ecf 214 }
icraggs 20:cad3d54d7ecf 215
icraggs 20:cad3d54d7ecf 216
icraggs 21:e918525e529d 217 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
icraggs 20:cad3d54d7ecf 218 {
icraggs 20:cad3d54d7ecf 219 int sent = 0;
icraggs 20:cad3d54d7ecf 220
icraggs 20:cad3d54d7ecf 221 while (sent < length)
icraggs 20:cad3d54d7ecf 222 sent += ipstack->write(&buf[sent], length, timeout);
icraggs 20:cad3d54d7ecf 223 if (sent == length)
icraggs 20:cad3d54d7ecf 224 ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
icraggs 20:cad3d54d7ecf 225 return sent;
icraggs 20:cad3d54d7ecf 226 }
icraggs 20:cad3d54d7ecf 227
icraggs 20:cad3d54d7ecf 228
icraggs 21:e918525e529d 229 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
icraggs 20:cad3d54d7ecf 230 {
icraggs 20:cad3d54d7ecf 231 char c;
icraggs 20:cad3d54d7ecf 232 int multiplier = 1;
icraggs 20:cad3d54d7ecf 233 int len = 0;
icraggs 20:cad3d54d7ecf 234 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
icraggs 20:cad3d54d7ecf 235
icraggs 20:cad3d54d7ecf 236 *value = 0;
icraggs 20:cad3d54d7ecf 237 do
icraggs 20:cad3d54d7ecf 238 {
icraggs 20:cad3d54d7ecf 239 int rc = MQTTPACKET_READ_ERROR;
icraggs 20:cad3d54d7ecf 240
icraggs 20:cad3d54d7ecf 241 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
icraggs 20:cad3d54d7ecf 242 {
icraggs 20:cad3d54d7ecf 243 rc = MQTTPACKET_READ_ERROR; /* bad data */
icraggs 20:cad3d54d7ecf 244 goto exit;
icraggs 20:cad3d54d7ecf 245 }
icraggs 20:cad3d54d7ecf 246 rc = ipstack->read(&c, 1, timeout);
icraggs 20:cad3d54d7ecf 247 if (rc != 1)
icraggs 20:cad3d54d7ecf 248 goto exit;
icraggs 20:cad3d54d7ecf 249 *value += (c & 127) * multiplier;
icraggs 20:cad3d54d7ecf 250 multiplier *= 128;
icraggs 20:cad3d54d7ecf 251 } while ((c & 128) != 0);
icraggs 20:cad3d54d7ecf 252 exit:
icraggs 20:cad3d54d7ecf 253 return len;
icraggs 20:cad3d54d7ecf 254 }
icraggs 20:cad3d54d7ecf 255
icraggs 20:cad3d54d7ecf 256
icraggs 20:cad3d54d7ecf 257 /**
icraggs 20:cad3d54d7ecf 258 * If any read fails in this method, then we should disconnect from the network, as on reconnect
icraggs 20:cad3d54d7ecf 259 * the packets can be retried.
icraggs 20:cad3d54d7ecf 260 * @param timeout the max time to wait for the packet read to complete, in milliseconds
icraggs 20:cad3d54d7ecf 261 * @return the MQTT packet type, or -1 if none
icraggs 20:cad3d54d7ecf 262 */
icraggs 21:e918525e529d 263 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout)
icraggs 20:cad3d54d7ecf 264 {
icraggs 20:cad3d54d7ecf 265 int rc = -1;
icraggs 20:cad3d54d7ecf 266 MQTTHeader header = {0};
icraggs 20:cad3d54d7ecf 267 int len = 0;
icraggs 20:cad3d54d7ecf 268 int rem_len = 0;
icraggs 20:cad3d54d7ecf 269
icraggs 20:cad3d54d7ecf 270 /* 1. read the header byte. This has the packet type in it */
icraggs 20:cad3d54d7ecf 271 if (ipstack->read(readbuf, 1, timeout) != 1)
icraggs 20:cad3d54d7ecf 272 goto exit;
icraggs 20:cad3d54d7ecf 273
icraggs 20:cad3d54d7ecf 274 len = 1;
icraggs 20:cad3d54d7ecf 275 /* 2. read the remaining length. This is variable in itself */
icraggs 20:cad3d54d7ecf 276 decodePacket(&rem_len, timeout);
icraggs 20:cad3d54d7ecf 277 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
icraggs 20:cad3d54d7ecf 278
icraggs 20:cad3d54d7ecf 279 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
icraggs 20:cad3d54d7ecf 280 if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len)
icraggs 20:cad3d54d7ecf 281 goto exit;
icraggs 20:cad3d54d7ecf 282
icraggs 20:cad3d54d7ecf 283 header.byte = readbuf[0];
icraggs 20:cad3d54d7ecf 284 rc = header.bits.type;
icraggs 20:cad3d54d7ecf 285 exit:
icraggs 20:cad3d54d7ecf 286 return rc;
icraggs 20:cad3d54d7ecf 287 }
icraggs 20:cad3d54d7ecf 288
icraggs 20:cad3d54d7ecf 289
icraggs 21:e918525e529d 290 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
icraggs 20:cad3d54d7ecf 291 {
icraggs 20:cad3d54d7ecf 292 int rc = -1;
icraggs 20:cad3d54d7ecf 293
icraggs 20:cad3d54d7ecf 294 // we have to find the right message handler - indexed by topic
icraggs 20:cad3d54d7ecf 295 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
icraggs 20:cad3d54d7ecf 296 {
icraggs 20:cad3d54d7ecf 297 if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic))
icraggs 20:cad3d54d7ecf 298 {
icraggs 20:cad3d54d7ecf 299 messageHandlers[i].fp(message);
icraggs 20:cad3d54d7ecf 300 rc = 0;
icraggs 20:cad3d54d7ecf 301 break;
icraggs 20:cad3d54d7ecf 302 }
icraggs 20:cad3d54d7ecf 303 }
icraggs 20:cad3d54d7ecf 304
icraggs 20:cad3d54d7ecf 305 return rc;
icraggs 20:cad3d54d7ecf 306 }
icraggs 20:cad3d54d7ecf 307
icraggs 20:cad3d54d7ecf 308
icraggs 20:cad3d54d7ecf 309
icraggs 21:e918525e529d 310 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout)
icraggs 20:cad3d54d7ecf 311 {
icraggs 20:cad3d54d7ecf 312 /* get one piece of work off the wire and one pass through */
icraggs 20:cad3d54d7ecf 313
icraggs 20:cad3d54d7ecf 314 // read the socket, see what work is due
icraggs 20:cad3d54d7ecf 315 int packet_type = readPacket(timeout);
icraggs 20:cad3d54d7ecf 316
icraggs 20:cad3d54d7ecf 317 int len, rc;
icraggs 20:cad3d54d7ecf 318 switch (packet_type)
icraggs 20:cad3d54d7ecf 319 {
icraggs 20:cad3d54d7ecf 320 case CONNACK:
icraggs 20:cad3d54d7ecf 321 if (this->thread)
icraggs 20:cad3d54d7ecf 322 {
icraggs 20:cad3d54d7ecf 323 Result res = {this, 0};
icraggs 21:e918525e529d 324 if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 325 ;
icraggs 20:cad3d54d7ecf 326 connectHandler(&res);
icraggs 20:cad3d54d7ecf 327 connectHandler.detach(); // only invoke the callback once
icraggs 20:cad3d54d7ecf 328 }
icraggs 20:cad3d54d7ecf 329 break;
icraggs 20:cad3d54d7ecf 330 case PUBACK:
icraggs 20:cad3d54d7ecf 331 if (this->thread)
icraggs 20:cad3d54d7ecf 332 ; //call resultHandler
icraggs 20:cad3d54d7ecf 333 case SUBACK:
icraggs 20:cad3d54d7ecf 334 break;
icraggs 20:cad3d54d7ecf 335 case PUBLISH:
icraggs 20:cad3d54d7ecf 336 MQTTString topicName;
icraggs 20:cad3d54d7ecf 337 Message msg;
icraggs 20:cad3d54d7ecf 338 rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
icraggs 20:cad3d54d7ecf 339 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);;
icraggs 20:cad3d54d7ecf 340 if (msg.qos == QOS0)
icraggs 20:cad3d54d7ecf 341 deliverMessage(&topicName, &msg);
icraggs 20:cad3d54d7ecf 342 break;
icraggs 20:cad3d54d7ecf 343 case PUBREC:
icraggs 20:cad3d54d7ecf 344 int type, dup, mypacketid;
icraggs 20:cad3d54d7ecf 345 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 346 ;
icraggs 20:cad3d54d7ecf 347 // must lock this access against the application thread, if we are multi-threaded
icraggs 20:cad3d54d7ecf 348 len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid);
icraggs 20:cad3d54d7ecf 349 rc = sendPacket(len, timeout); // send the PUBREL packet
icraggs 20:cad3d54d7ecf 350 if (rc != len)
icraggs 20:cad3d54d7ecf 351 goto exit; // there was a problem
icraggs 20:cad3d54d7ecf 352
icraggs 20:cad3d54d7ecf 353 break;
icraggs 20:cad3d54d7ecf 354 case PUBCOMP:
icraggs 20:cad3d54d7ecf 355 break;
icraggs 20:cad3d54d7ecf 356 case PINGRESP:
icraggs 20:cad3d54d7ecf 357 ping_outstanding = false;
icraggs 20:cad3d54d7ecf 358 break;
icraggs 20:cad3d54d7ecf 359 }
icraggs 20:cad3d54d7ecf 360 keepalive();
icraggs 20:cad3d54d7ecf 361 exit:
icraggs 20:cad3d54d7ecf 362 return packet_type;
icraggs 20:cad3d54d7ecf 363 }
icraggs 20:cad3d54d7ecf 364
icraggs 20:cad3d54d7ecf 365
icraggs 21:e918525e529d 366 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive()
icraggs 20:cad3d54d7ecf 367 {
icraggs 20:cad3d54d7ecf 368 int rc = 0;
icraggs 20:cad3d54d7ecf 369
icraggs 20:cad3d54d7ecf 370 if (keepAliveInterval == 0)
icraggs 20:cad3d54d7ecf 371 goto exit;
icraggs 20:cad3d54d7ecf 372
icraggs 20:cad3d54d7ecf 373 if (ping_timer.expired())
icraggs 20:cad3d54d7ecf 374 {
icraggs 20:cad3d54d7ecf 375 if (ping_outstanding)
icraggs 20:cad3d54d7ecf 376 rc = -1;
icraggs 20:cad3d54d7ecf 377 else
icraggs 20:cad3d54d7ecf 378 {
icraggs 20:cad3d54d7ecf 379 int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE);
icraggs 20:cad3d54d7ecf 380 rc = sendPacket(len, 1000); // send the ping packet
icraggs 20:cad3d54d7ecf 381 if (rc != len)
icraggs 20:cad3d54d7ecf 382 rc = -1; // indicate there's a problem
icraggs 20:cad3d54d7ecf 383 else
icraggs 20:cad3d54d7ecf 384 ping_outstanding = true;
icraggs 20:cad3d54d7ecf 385 }
icraggs 20:cad3d54d7ecf 386 }
icraggs 20:cad3d54d7ecf 387
icraggs 20:cad3d54d7ecf 388 exit:
icraggs 20:cad3d54d7ecf 389 return rc;
icraggs 20:cad3d54d7ecf 390 }
icraggs 20:cad3d54d7ecf 391
icraggs 20:cad3d54d7ecf 392
icraggs 21:e918525e529d 393 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument)
icraggs 20:cad3d54d7ecf 394 {
icraggs 20:cad3d54d7ecf 395 while (true)
icraggs 20:cad3d54d7ecf 396 cycle(ping_timer.left_ms());
icraggs 20:cad3d54d7ecf 397 }
icraggs 20:cad3d54d7ecf 398
icraggs 20:cad3d54d7ecf 399
icraggs 20:cad3d54d7ecf 400 // only used in single-threaded mode where one command at a time is in process
icraggs 21:e918525e529d 401 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer)
icraggs 20:cad3d54d7ecf 402 {
icraggs 20:cad3d54d7ecf 403 int rc = -1;
icraggs 20:cad3d54d7ecf 404
icraggs 20:cad3d54d7ecf 405 do
icraggs 20:cad3d54d7ecf 406 {
icraggs 20:cad3d54d7ecf 407 if (atimer.expired())
icraggs 20:cad3d54d7ecf 408 break; // we timed out
icraggs 20:cad3d54d7ecf 409 }
icraggs 20:cad3d54d7ecf 410 while ((rc = cycle(atimer.left_ms())) != packet_type);
icraggs 20:cad3d54d7ecf 411
icraggs 20:cad3d54d7ecf 412 return rc;
icraggs 20:cad3d54d7ecf 413 }
icraggs 20:cad3d54d7ecf 414
icraggs 20:cad3d54d7ecf 415
icraggs 21:e918525e529d 416 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options)
icraggs 20:cad3d54d7ecf 417 {
icraggs 21:e918525e529d 418 connect_timer.countdown(limits.command_timeout_ms);
icraggs 20:cad3d54d7ecf 419
icraggs 20:cad3d54d7ecf 420 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
icraggs 20:cad3d54d7ecf 421 if (options == 0)
icraggs 20:cad3d54d7ecf 422 options = &default_options; // set default options if none were supplied
icraggs 20:cad3d54d7ecf 423
icraggs 20:cad3d54d7ecf 424 this->keepAliveInterval = options->keepAliveInterval;
icraggs 20:cad3d54d7ecf 425 ping_timer.countdown(this->keepAliveInterval);
icraggs 20:cad3d54d7ecf 426 int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options);
icraggs 20:cad3d54d7ecf 427 int rc = sendPacket(len, connect_timer.left_ms()); // send the connect packet
icraggs 20:cad3d54d7ecf 428 if (rc != len)
icraggs 20:cad3d54d7ecf 429 goto exit; // there was a problem
icraggs 20:cad3d54d7ecf 430
icraggs 20:cad3d54d7ecf 431 if (resultHandler == 0) // wait until the connack is received
icraggs 20:cad3d54d7ecf 432 {
icraggs 20:cad3d54d7ecf 433 // this will be a blocking call, wait for the connack
icraggs 20:cad3d54d7ecf 434 if (waitfor(CONNACK, connect_timer) == CONNACK)
icraggs 20:cad3d54d7ecf 435 {
icraggs 20:cad3d54d7ecf 436 int connack_rc = -1;
icraggs 20:cad3d54d7ecf 437 if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 438 rc = connack_rc;
icraggs 20:cad3d54d7ecf 439 }
icraggs 20:cad3d54d7ecf 440 }
icraggs 20:cad3d54d7ecf 441 else
icraggs 20:cad3d54d7ecf 442 {
icraggs 20:cad3d54d7ecf 443 // set connect response callback function
icraggs 20:cad3d54d7ecf 444 connectHandler.attach(resultHandler);
icraggs 20:cad3d54d7ecf 445
icraggs 20:cad3d54d7ecf 446 // start background thread
icraggs 21:e918525e529d 447 this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);
icraggs 20:cad3d54d7ecf 448 }
icraggs 20:cad3d54d7ecf 449
icraggs 20:cad3d54d7ecf 450 exit:
icraggs 20:cad3d54d7ecf 451 return rc;
icraggs 20:cad3d54d7ecf 452 }
icraggs 20:cad3d54d7ecf 453
icraggs 20:cad3d54d7ecf 454
icraggs 21:e918525e529d 455 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation()
icraggs 20:cad3d54d7ecf 456 {
icraggs 20:cad3d54d7ecf 457 int found = -1;
icraggs 20:cad3d54d7ecf 458 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
icraggs 20:cad3d54d7ecf 459 {
icraggs 20:cad3d54d7ecf 460 if (operations[i].id == 0)
icraggs 20:cad3d54d7ecf 461 {
icraggs 20:cad3d54d7ecf 462 found = i;
icraggs 20:cad3d54d7ecf 463 break;
icraggs 20:cad3d54d7ecf 464 }
icraggs 20:cad3d54d7ecf 465 }
icraggs 20:cad3d54d7ecf 466 return found;
icraggs 20:cad3d54d7ecf 467 }
icraggs 20:cad3d54d7ecf 468
icraggs 20:cad3d54d7ecf 469
icraggs 21:e918525e529d 470 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
icraggs 20:cad3d54d7ecf 471 {
icraggs 20:cad3d54d7ecf 472 int index = 0;
icraggs 20:cad3d54d7ecf 473 if (this->thread)
icraggs 20:cad3d54d7ecf 474 index = findFreeOperation();
icraggs 20:cad3d54d7ecf 475 Timer& atimer = operations[index].timer;
icraggs 20:cad3d54d7ecf 476
icraggs 21:e918525e529d 477 atimer.countdown(limits.command_timeout_ms);
icraggs 20:cad3d54d7ecf 478 MQTTString topic = {(char*)topicFilter, 0, 0};
icraggs 20:cad3d54d7ecf 479
icraggs 20:cad3d54d7ecf 480 int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
icraggs 20:cad3d54d7ecf 481 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
icraggs 20:cad3d54d7ecf 482 if (rc != len)
icraggs 20:cad3d54d7ecf 483 goto exit; // there was a problem
icraggs 20:cad3d54d7ecf 484
icraggs 20:cad3d54d7ecf 485 /* wait for suback */
icraggs 20:cad3d54d7ecf 486 if (resultHandler == 0)
icraggs 20:cad3d54d7ecf 487 {
icraggs 20:cad3d54d7ecf 488 // this will block
icraggs 20:cad3d54d7ecf 489 if (waitfor(SUBACK, atimer) == SUBACK)
icraggs 20:cad3d54d7ecf 490 {
icraggs 20:cad3d54d7ecf 491 int count = 0, grantedQoS = -1, mypacketid;
icraggs 20:cad3d54d7ecf 492 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 493 rc = grantedQoS; // 0, 1, 2 or 0x80
icraggs 20:cad3d54d7ecf 494 if (rc != 0x80)
icraggs 20:cad3d54d7ecf 495 {
icraggs 20:cad3d54d7ecf 496 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
icraggs 20:cad3d54d7ecf 497 {
icraggs 20:cad3d54d7ecf 498 if (messageHandlers[i].topic == 0)
icraggs 20:cad3d54d7ecf 499 {
icraggs 20:cad3d54d7ecf 500 messageHandlers[i].topic = topicFilter;
icraggs 20:cad3d54d7ecf 501 messageHandlers[i].fp.attach(messageHandler);
icraggs 20:cad3d54d7ecf 502 rc = 0;
icraggs 20:cad3d54d7ecf 503 break;
icraggs 20:cad3d54d7ecf 504 }
icraggs 20:cad3d54d7ecf 505 }
icraggs 20:cad3d54d7ecf 506 }
icraggs 20:cad3d54d7ecf 507 }
icraggs 20:cad3d54d7ecf 508 }
icraggs 20:cad3d54d7ecf 509 else
icraggs 20:cad3d54d7ecf 510 {
icraggs 20:cad3d54d7ecf 511 // set subscribe response callback function
icraggs 20:cad3d54d7ecf 512
icraggs 20:cad3d54d7ecf 513 }
icraggs 20:cad3d54d7ecf 514
icraggs 20:cad3d54d7ecf 515 exit:
icraggs 20:cad3d54d7ecf 516 return rc;
icraggs 20:cad3d54d7ecf 517 }
icraggs 20:cad3d54d7ecf 518
icraggs 20:cad3d54d7ecf 519
icraggs 21:e918525e529d 520 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter)
icraggs 20:cad3d54d7ecf 521 {
icraggs 20:cad3d54d7ecf 522 int index = 0;
icraggs 20:cad3d54d7ecf 523 if (this->thread)
icraggs 20:cad3d54d7ecf 524 index = findFreeOperation();
icraggs 20:cad3d54d7ecf 525 Timer& atimer = operations[index].timer;
icraggs 20:cad3d54d7ecf 526
icraggs 21:e918525e529d 527 atimer.countdown(limits.command_timeout_ms);
icraggs 20:cad3d54d7ecf 528 MQTTString topic = {(char*)topicFilter, 0, 0};
icraggs 20:cad3d54d7ecf 529
icraggs 20:cad3d54d7ecf 530 int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
icraggs 20:cad3d54d7ecf 531 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
icraggs 20:cad3d54d7ecf 532 if (rc != len)
icraggs 20:cad3d54d7ecf 533 goto exit; // there was a problem
icraggs 20:cad3d54d7ecf 534
icraggs 21:e918525e529d 535 // set unsubscribe response callback function
icraggs 20:cad3d54d7ecf 536
icraggs 20:cad3d54d7ecf 537
icraggs 20:cad3d54d7ecf 538 exit:
icraggs 20:cad3d54d7ecf 539 return rc;
icraggs 20:cad3d54d7ecf 540 }
icraggs 20:cad3d54d7ecf 541
icraggs 20:cad3d54d7ecf 542
icraggs 20:cad3d54d7ecf 543
icraggs 21:e918525e529d 544 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message)
icraggs 20:cad3d54d7ecf 545 {
icraggs 20:cad3d54d7ecf 546 int index = 0;
icraggs 20:cad3d54d7ecf 547 if (this->thread)
icraggs 20:cad3d54d7ecf 548 index = findFreeOperation();
icraggs 20:cad3d54d7ecf 549 Timer& atimer = operations[index].timer;
icraggs 20:cad3d54d7ecf 550
icraggs 21:e918525e529d 551 atimer.countdown(limits.command_timeout_ms);
icraggs 20:cad3d54d7ecf 552 MQTTString topic = {(char*)topicName, 0, 0};
icraggs 20:cad3d54d7ecf 553
icraggs 20:cad3d54d7ecf 554 if (message->qos == QOS1 || message->qos == QOS2)
icraggs 20:cad3d54d7ecf 555 message->id = packetid.getNext();
icraggs 20:cad3d54d7ecf 556
icraggs 20:cad3d54d7ecf 557 int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topic, (char*)message->payload, message->payloadlen);
icraggs 20:cad3d54d7ecf 558 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
icraggs 20:cad3d54d7ecf 559 if (rc != len)
icraggs 20:cad3d54d7ecf 560 goto exit; // there was a problem
icraggs 20:cad3d54d7ecf 561
icraggs 20:cad3d54d7ecf 562 /* wait for acks */
icraggs 20:cad3d54d7ecf 563 if (resultHandler == 0)
icraggs 20:cad3d54d7ecf 564 {
icraggs 20:cad3d54d7ecf 565 if (message->qos == QOS1)
icraggs 20:cad3d54d7ecf 566 {
icraggs 20:cad3d54d7ecf 567 if (waitfor(PUBACK, atimer) == PUBACK)
icraggs 20:cad3d54d7ecf 568 {
icraggs 20:cad3d54d7ecf 569 int type, dup, mypacketid;
icraggs 20:cad3d54d7ecf 570 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 571 rc = 0;
icraggs 20:cad3d54d7ecf 572 }
icraggs 20:cad3d54d7ecf 573 }
icraggs 20:cad3d54d7ecf 574 else if (message->qos == QOS2)
icraggs 20:cad3d54d7ecf 575 {
icraggs 20:cad3d54d7ecf 576 if (waitfor(PUBCOMP, atimer) == PUBCOMP)
icraggs 20:cad3d54d7ecf 577 {
icraggs 20:cad3d54d7ecf 578 int type, dup, mypacketid;
icraggs 20:cad3d54d7ecf 579 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 580 rc = 0;
icraggs 20:cad3d54d7ecf 581 }
icraggs 20:cad3d54d7ecf 582
icraggs 20:cad3d54d7ecf 583 }
icraggs 20:cad3d54d7ecf 584 }
icraggs 20:cad3d54d7ecf 585 else
icraggs 20:cad3d54d7ecf 586 {
icraggs 20:cad3d54d7ecf 587 // set publish response callback function
icraggs 20:cad3d54d7ecf 588
icraggs 20:cad3d54d7ecf 589 }
icraggs 20:cad3d54d7ecf 590
icraggs 20:cad3d54d7ecf 591 exit:
icraggs 20:cad3d54d7ecf 592 return rc;
icraggs 20:cad3d54d7ecf 593 }
icraggs 20:cad3d54d7ecf 594
icraggs 20:cad3d54d7ecf 595
icraggs 21:e918525e529d 596 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler)
icraggs 21:e918525e529d 597 {
icraggs 21:e918525e529d 598 Timer timer = Timer(limits.command_timeout_ms); // we might wait for incomplete incoming publishes to complete
icraggs 21:e918525e529d 599 int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE);
icraggs 21:e918525e529d 600 int rc = sendPacket(len, timer.left_ms()); // send the disconnect packet
icraggs 21:e918525e529d 601
icraggs 21:e918525e529d 602 return (rc == len) ? 0 : -1;
icraggs 21:e918525e529d 603 }
icraggs 21:e918525e529d 604
icraggs 21:e918525e529d 605
icraggs 21:e918525e529d 606
icraggs 23:05fc7de97d4a 607 #endif