Added function to return network interface.

Dependencies:   MQTTPacket FP

Dependents:   IBMIoTF-2

Committer:
lamell
Date:
Sat Mar 21 16:17:57 2020 -0400
Revision:
65:ff8eaf67f510
Parent:
64:9adad8863791
Parent:
63:90c73a6bfa42
File MQTTClient.h:
Changed MAX_MQTT_PACKET_SIZE = 2048, original size is limited to 100 bytes.
If the message is larher than this limit, then no message will be identified in the server.

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 6:4d312a49200b 1 /*******************************************************************************
icraggs 53:15b5a280d22d 2 * Copyright (c) 2014, 2017 IBM Corp.
sam_grove 0:fe461e4d7afe 3 *
icraggs 6:4d312a49200b 4 * All rights reserved. This program and the accompanying materials
icraggs 6:4d312a49200b 5 * are made available under the terms of the Eclipse Public License v1.0
icraggs 6:4d312a49200b 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
sam_grove 0:fe461e4d7afe 7 *
icraggs 6:4d312a49200b 8 * The Eclipse Public License is available at
icraggs 6:4d312a49200b 9 * http://www.eclipse.org/legal/epl-v10.html
icraggs 6:4d312a49200b 10 * and the Eclipse Distribution License is available at
icraggs 6:4d312a49200b 11 * http://www.eclipse.org/org/documents/edl-v10.php.
sam_grove 0:fe461e4d7afe 12 *
icraggs 6:4d312a49200b 13 * Contributors:
icraggs 6:4d312a49200b 14 * Ian Craggs - initial API and implementation and/or initial documentation
icraggs 46:e335fcc1a663 15 * Ian Craggs - fix for bug 458512 - QoS 2 messages
icraggs 46:e335fcc1a663 16 * Ian Craggs - fix for bug 460389 - send loop uses wrong length
icraggs 46:e335fcc1a663 17 * Ian Craggs - fix for bug 464169 - clearing subscriptions
icraggs 46:e335fcc1a663 18 * Ian Craggs - fix for bug 464551 - enums and ints can be different size
icraggs 53:15b5a280d22d 19 * Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer
icraggs 53:15b5a280d22d 20 * Ian Craggs - fix for bug 475749 - packetid modified twice
icraggs 53:15b5a280d22d 21 * Ian Craggs - add ability to set message handler separately #6
icraggs 6:4d312a49200b 22 *******************************************************************************/
sam_grove 0:fe461e4d7afe 23
icraggs 2:dcfdd2abfe71 24 #if !defined(MQTTCLIENT_H)
icraggs 2:dcfdd2abfe71 25 #define MQTTCLIENT_H
icraggs 2:dcfdd2abfe71 26
lamell 60:fccb353e53aa 27 #include <string.h>
icraggs 34:e18a166198df 28 #include "FP.h"
icraggs 3:dbff6b768d28 29 #include "MQTTPacket.h"
icraggs 53:15b5a280d22d 30 #include <stdio.h>
icraggs 43:21da1f744243 31 #include "MQTTLogging.h"
icraggs 43:21da1f744243 32
icraggs 43:21da1f744243 33 #if !defined(MQTTCLIENT_QOS1)
icraggs 43:21da1f744243 34 #define MQTTCLIENT_QOS1 1
icraggs 43:21da1f744243 35 #endif
icraggs 43:21da1f744243 36 #if !defined(MQTTCLIENT_QOS2)
icraggs 43:21da1f744243 37 #define MQTTCLIENT_QOS2 0
icraggs 43:21da1f744243 38 #endif
icraggs 2:dcfdd2abfe71 39
icraggs 3:dbff6b768d28 40 namespace MQTT
icraggs 3:dbff6b768d28 41 {
icraggs 3:dbff6b768d28 42
icraggs 2:dcfdd2abfe71 43
icraggs 2:dcfdd2abfe71 44 enum QoS { QOS0, QOS1, QOS2 };
sam_grove 0:fe461e4d7afe 45
icraggs 31:a51dd239b78e 46 // all failure return codes must be negative
icraggs 23:05fc7de97d4a 47 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
icraggs 23:05fc7de97d4a 48
icraggs 2:dcfdd2abfe71 49
icraggs 3:dbff6b768d28 50 struct Message
icraggs 2:dcfdd2abfe71 51 {
icraggs 2:dcfdd2abfe71 52 enum QoS qos;
icraggs 2:dcfdd2abfe71 53 bool retained;
icraggs 2:dcfdd2abfe71 54 bool dup;
Ian Craggs 12:cc7f2d62a393 55 unsigned short id;
icraggs 2:dcfdd2abfe71 56 void *payload;
icraggs 2:dcfdd2abfe71 57 size_t payloadlen;
sam_grove 0:fe461e4d7afe 58 };
sam_grove 0:fe461e4d7afe 59
icraggs 4:4ef00243708e 60
icraggs 20:cad3d54d7ecf 61 struct MessageData
icraggs 20:cad3d54d7ecf 62 {
icraggs 31:a51dd239b78e 63 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName)
icraggs 37:e3d64f9b986c 64 { }
icraggs 43:21da1f744243 65
icraggs 31:a51dd239b78e 66 struct Message &message;
icraggs 31:a51dd239b78e 67 MQTTString &topicName;
icraggs 20:cad3d54d7ecf 68 };
icraggs 20:cad3d54d7ecf 69
icraggs 20:cad3d54d7ecf 70
icraggs 53:15b5a280d22d 71 struct connackData
icraggs 53:15b5a280d22d 72 {
icraggs 53:15b5a280d22d 73 int rc;
icraggs 53:15b5a280d22d 74 bool sessionPresent;
icraggs 53:15b5a280d22d 75 };
icraggs 53:15b5a280d22d 76
icraggs 53:15b5a280d22d 77
icraggs 53:15b5a280d22d 78 struct subackData
icraggs 53:15b5a280d22d 79 {
icraggs 53:15b5a280d22d 80 int grantedQoS;
icraggs 53:15b5a280d22d 81 };
icraggs 53:15b5a280d22d 82
icraggs 53:15b5a280d22d 83
icraggs 9:01b8cc7d94cc 84 class PacketId
icraggs 9:01b8cc7d94cc 85 {
icraggs 9:01b8cc7d94cc 86 public:
icraggs 23:05fc7de97d4a 87 PacketId()
icraggs 23:05fc7de97d4a 88 {
icraggs 23:05fc7de97d4a 89 next = 0;
icraggs 23:05fc7de97d4a 90 }
icraggs 43:21da1f744243 91
icraggs 23:05fc7de97d4a 92 int getNext()
icraggs 23:05fc7de97d4a 93 {
icraggs 53:15b5a280d22d 94 return next = (next == MAX_PACKET_ID) ? 1 : next + 1;
icraggs 23:05fc7de97d4a 95 }
icraggs 43:21da1f744243 96
icraggs 9:01b8cc7d94cc 97 private:
icraggs 9:01b8cc7d94cc 98 static const int MAX_PACKET_ID = 65535;
icraggs 9:01b8cc7d94cc 99 int next;
icraggs 9:01b8cc7d94cc 100 };
icraggs 31:a51dd239b78e 101
icraggs 31:a51dd239b78e 102
icraggs 21:e918525e529d 103 /**
icraggs 21:e918525e529d 104 * @class Client
icraggs 22:aadb79d29330 105 * @brief blocking, non-threaded MQTT client API
icraggs 43:21da1f744243 106 *
icraggs 23:05fc7de97d4a 107 * This version of the API blocks on all method calls, until they are complete. This means that only one
icraggs 43:21da1f744243 108 * MQTT request can be in process at any one time.
icraggs 21:e918525e529d 109 * @param Network a network class which supports send, receive
icraggs 43:21da1f744243 110 * @param Timer a timer class with the methods:
icraggs 43:21da1f744243 111 */
lamell 64:9adad8863791 112 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 2048, int MAX_MESSAGE_HANDLERS = 5>
icraggs 31:a51dd239b78e 113 class Client
icraggs 2:dcfdd2abfe71 114 {
icraggs 43:21da1f744243 115
icraggs 22:aadb79d29330 116 public:
icraggs 43:21da1f744243 117
icraggs 31:a51dd239b78e 118 typedef void (*messageHandler)(MessageData&);
icraggs 23:05fc7de97d4a 119
icraggs 23:05fc7de97d4a 120 /** Construct the client
icraggs 23:05fc7de97d4a 121 * @param network - pointer to an instance of the Network class - must be connected to the endpoint
icraggs 23:05fc7de97d4a 122 * before calling MQTT connect
icraggs 23:05fc7de97d4a 123 * @param limits an instance of the Limit class - to alter limits as required
icraggs 23:05fc7de97d4a 124 */
icraggs 43:21da1f744243 125 Client(Network& network, unsigned int command_timeout_ms = 30000);
icraggs 43:21da1f744243 126
icraggs 20:cad3d54d7ecf 127 /** Set the default message handling callback - used for any message which does not match a subscription message handler
icraggs 53:15b5a280d22d 128 * @param mh - pointer to the callback function. Set to 0 to remove.
icraggs 20:cad3d54d7ecf 129 */
icraggs 20:cad3d54d7ecf 130 void setDefaultMessageHandler(messageHandler mh)
icraggs 20:cad3d54d7ecf 131 {
icraggs 53:15b5a280d22d 132 if (mh != 0)
icraggs 53:15b5a280d22d 133 defaultMessageHandler.attach(mh);
icraggs 53:15b5a280d22d 134 else
icraggs 53:15b5a280d22d 135 defaultMessageHandler.detach();
icraggs 20:cad3d54d7ecf 136 }
icraggs 42:f5beda831651 137
icraggs 53:15b5a280d22d 138 /** Set a message handling callback. This can be used outside of the the subscribe method.
icraggs 53:15b5a280d22d 139 * @param topicFilter - a topic pattern which can include wildcards
icraggs 53:15b5a280d22d 140 * @param mh - pointer to the callback function. If 0, removes the callback if any
icraggs 53:15b5a280d22d 141 */
icraggs 53:15b5a280d22d 142 int setMessageHandler(const char* topicFilter, messageHandler mh);
icraggs 53:15b5a280d22d 143
icraggs 43:21da1f744243 144 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
icraggs 43:21da1f744243 145 * The nework object must be connected to the network endpoint before calling this
icraggs 43:21da1f744243 146 * Default connect options are used
icraggs 43:21da1f744243 147 * @return success code -
icraggs 43:21da1f744243 148 */
icraggs 43:21da1f744243 149 int connect();
icraggs 53:15b5a280d22d 150
icraggs 53:15b5a280d22d 151 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
icraggs 43:21da1f744243 152 * The nework object must be connected to the network endpoint before calling this
icraggs 20:cad3d54d7ecf 153 * @param options - connect options
icraggs 43:21da1f744243 154 * @return success code -
icraggs 43:21da1f744243 155 */
icraggs 43:21da1f744243 156 int connect(MQTTPacket_connectData& options);
icraggs 43:21da1f744243 157
icraggs 53:15b5a280d22d 158 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
icraggs 53:15b5a280d22d 159 * The nework object must be connected to the network endpoint before calling this
icraggs 53:15b5a280d22d 160 * @param options - connect options
icraggs 53:15b5a280d22d 161 * @param connackData - connack data to be returned
icraggs 53:15b5a280d22d 162 * @return success code -
icraggs 53:15b5a280d22d 163 */
icraggs 53:15b5a280d22d 164 int connect(MQTTPacket_connectData& options, connackData& data);
icraggs 53:15b5a280d22d 165
icraggs 20:cad3d54d7ecf 166 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
icraggs 20:cad3d54d7ecf 167 * @param topic - the topic to publish to
icraggs 20:cad3d54d7ecf 168 * @param message - the message to send
icraggs 43:21da1f744243 169 * @return success code -
icraggs 43:21da1f744243 170 */
icraggs 43:21da1f744243 171 int publish(const char* topicName, Message& message);
icraggs 53:15b5a280d22d 172
icraggs 43:21da1f744243 173 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
icraggs 43:21da1f744243 174 * @param topic - the topic to publish to
icraggs 43:21da1f744243 175 * @param payload - the data to send
icraggs 43:21da1f744243 176 * @param payloadlen - the length of the data
icraggs 43:21da1f744243 177 * @param qos - the QoS to send the publish at
icraggs 43:21da1f744243 178 * @param retained - whether the message should be retained
icraggs 43:21da1f744243 179 * @return success code -
icraggs 43:21da1f744243 180 */
icraggs 43:21da1f744243 181 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
icraggs 53:15b5a280d22d 182
icraggs 43:21da1f744243 183 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
icraggs 43:21da1f744243 184 * @param topic - the topic to publish to
icraggs 43:21da1f744243 185 * @param payload - the data to send
icraggs 43:21da1f744243 186 * @param payloadlen - the length of the data
icraggs 53:15b5a280d22d 187 * @param id - the packet id used - returned
icraggs 43:21da1f744243 188 * @param qos - the QoS to send the publish at
icraggs 43:21da1f744243 189 * @param retained - whether the message should be retained
icraggs 43:21da1f744243 190 * @return success code -
icraggs 43:21da1f744243 191 */
icraggs 43:21da1f744243 192 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
icraggs 43:21da1f744243 193
icraggs 20:cad3d54d7ecf 194 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
icraggs 20:cad3d54d7ecf 195 * @param topicFilter - a topic pattern which can include wildcards
icraggs 20:cad3d54d7ecf 196 * @param qos - the MQTT QoS to subscribe at
icraggs 20:cad3d54d7ecf 197 * @param mh - the callback function to be invoked when a message is received for this subscription
icraggs 43:21da1f744243 198 * @return success code -
icraggs 43:21da1f744243 199 */
icraggs 20:cad3d54d7ecf 200 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
icraggs 43:21da1f744243 201
icraggs 53:15b5a280d22d 202 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
icraggs 53:15b5a280d22d 203 * @param topicFilter - a topic pattern which can include wildcards
icraggs 53:15b5a280d22d 204 * @param qos - the MQTT QoS to subscribe at©
icraggs 53:15b5a280d22d 205 * @param mh - the callback function to be invoked when a message is received for this subscription
icraggs 53:15b5a280d22d 206 * @param
icraggs 53:15b5a280d22d 207 * @return success code -
icraggs 53:15b5a280d22d 208 */
icraggs 53:15b5a280d22d 209 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data);
icraggs 53:15b5a280d22d 210
icraggs 20:cad3d54d7ecf 211 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
icraggs 20:cad3d54d7ecf 212 * @param topicFilter - a topic pattern which can include wildcards
icraggs 43:21da1f744243 213 * @return success code -
icraggs 43:21da1f744243 214 */
icraggs 20:cad3d54d7ecf 215 int unsubscribe(const char* topicFilter);
icraggs 43:21da1f744243 216
icraggs 31:a51dd239b78e 217 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
icraggs 43:21da1f744243 218 * @return success code -
icraggs 20:cad3d54d7ecf 219 */
icraggs 20:cad3d54d7ecf 220 int disconnect();
icraggs 43:21da1f744243 221
icraggs 20:cad3d54d7ecf 222 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
icraggs 43:21da1f744243 223 * yield can be called if no other MQTT operation is needed. This will also allow messages to be
icraggs 20:cad3d54d7ecf 224 * received.
icraggs 23:05fc7de97d4a 225 * @param timeout_ms the time to wait, in milliseconds
icraggs 26:2658bb87c53d 226 * @return success code - on failure, this means the client has disconnected
icraggs 20:cad3d54d7ecf 227 */
icraggs 43:21da1f744243 228 int yield(unsigned long timeout_ms = 1000L);
icraggs 43:21da1f744243 229
icraggs 43:21da1f744243 230 /** Is the client connected?
icraggs 43:21da1f744243 231 * @return flag - is the client connected or not?
icraggs 43:21da1f744243 232 */
icraggs 43:21da1f744243 233 bool isConnected()
icraggs 43:21da1f744243 234 {
icraggs 43:21da1f744243 235 return isconnected;
icraggs 43:21da1f744243 236 }
icraggs 43:21da1f744243 237
icraggs 20:cad3d54d7ecf 238 private:
icraggs 20:cad3d54d7ecf 239
icraggs 53:15b5a280d22d 240 void closeSession();
icraggs 46:e335fcc1a663 241 void cleanSession();
icraggs 20:cad3d54d7ecf 242 int cycle(Timer& timer);
icraggs 20:cad3d54d7ecf 243 int waitfor(int packet_type, Timer& timer);
icraggs 20:cad3d54d7ecf 244 int keepalive();
icraggs 43:21da1f744243 245 int publish(int len, Timer& timer, enum QoS qos);
icraggs 2:dcfdd2abfe71 246
icraggs 3:dbff6b768d28 247 int decodePacket(int* value, int timeout);
icraggs 20:cad3d54d7ecf 248 int readPacket(Timer& timer);
icraggs 20:cad3d54d7ecf 249 int sendPacket(int length, Timer& timer);
icraggs 26:2658bb87c53d 250 int deliverMessage(MQTTString& topicName, Message& message);
icraggs 26:2658bb87c53d 251 bool isTopicMatched(char* topicFilter, MQTTString& topicName);
icraggs 43:21da1f744243 252
icraggs 23:05fc7de97d4a 253 Network& ipstack;
icraggs 43:21da1f744243 254 unsigned long command_timeout_ms;
icraggs 15:64a57183aa03 255
icraggs 43:21da1f744243 256 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
icraggs 43:21da1f744243 257 unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
icraggs 43:21da1f744243 258
icraggs 43:21da1f744243 259 Timer last_sent, last_received;
icraggs 15:64a57183aa03 260 unsigned int keepAliveInterval;
icraggs 20:cad3d54d7ecf 261 bool ping_outstanding;
icraggs 43:21da1f744243 262 bool cleansession;
icraggs 43:21da1f744243 263
icraggs 9:01b8cc7d94cc 264 PacketId packetid;
icraggs 43:21da1f744243 265
icraggs 16:91c2f9a144d4 266 struct MessageHandlers
icraggs 15:64a57183aa03 267 {
icraggs 26:2658bb87c53d 268 const char* topicFilter;
icraggs 31:a51dd239b78e 269 FP<void, MessageData&> fp;
icraggs 23:05fc7de97d4a 270 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
icraggs 43:21da1f744243 271
icraggs 31:a51dd239b78e 272 FP<void, MessageData&> defaultMessageHandler;
icraggs 43:21da1f744243 273
icraggs 26:2658bb87c53d 274 bool isconnected;
icraggs 43:21da1f744243 275
icraggs 43:21da1f744243 276 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
icraggs 43:21da1f744243 277 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect
icraggs 43:21da1f744243 278 int inflightLen;
icraggs 43:21da1f744243 279 unsigned short inflightMsgid;
icraggs 43:21da1f744243 280 enum QoS inflightQoS;
icraggs 43:21da1f744243 281 #endif
icraggs 43:21da1f744243 282
icraggs 43:21da1f744243 283 #if MQTTCLIENT_QOS2
icraggs 43:21da1f744243 284 bool pubrel;
icraggs 43:21da1f744243 285 #if !defined(MAX_INCOMING_QOS2_MESSAGES)
icraggs 43:21da1f744243 286 #define MAX_INCOMING_QOS2_MESSAGES 10
icraggs 43:21da1f744243 287 #endif
icraggs 43:21da1f744243 288 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
icraggs 43:21da1f744243 289 bool isQoS2msgidFree(unsigned short id);
icraggs 43:21da1f744243 290 bool useQoS2msgid(unsigned short id);
icraggs 46:e335fcc1a663 291 void freeQoS2msgid(unsigned short id);
icraggs 31:a51dd239b78e 292 #endif
icraggs 28:8b2abe9bd814 293
sam_grove 0:fe461e4d7afe 294 };
sam_grove 0:fe461e4d7afe 295
icraggs 15:64a57183aa03 296 }
icraggs 15:64a57183aa03 297
icraggs 15:64a57183aa03 298
icraggs 43:21da1f744243 299 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
icraggs 53:15b5a280d22d 300 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
icraggs 46:e335fcc1a663 301 {
icraggs 46:e335fcc1a663 302 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
icraggs 46:e335fcc1a663 303 messageHandlers[i].topicFilter = 0;
icraggs 46:e335fcc1a663 304
icraggs 46:e335fcc1a663 305 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
icraggs 46:e335fcc1a663 306 inflightMsgid = 0;
icraggs 46:e335fcc1a663 307 inflightQoS = QOS0;
icraggs 46:e335fcc1a663 308 #endif
icraggs 46:e335fcc1a663 309
icraggs 46:e335fcc1a663 310 #if MQTTCLIENT_QOS2
icraggs 46:e335fcc1a663 311 pubrel = false;
icraggs 46:e335fcc1a663 312 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
icraggs 46:e335fcc1a663 313 incomingQoS2messages[i] = 0;
icraggs 46:e335fcc1a663 314 #endif
icraggs 46:e335fcc1a663 315 }
icraggs 46:e335fcc1a663 316
icraggs 46:e335fcc1a663 317
icraggs 46:e335fcc1a663 318 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
icraggs 53:15b5a280d22d 319 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession()
icraggs 53:15b5a280d22d 320 {
icraggs 53:15b5a280d22d 321 ping_outstanding = false;
icraggs 53:15b5a280d22d 322 isconnected = false;
icraggs 53:15b5a280d22d 323 if (cleansession)
icraggs 53:15b5a280d22d 324 cleanSession();
icraggs 53:15b5a280d22d 325 }
icraggs 53:15b5a280d22d 326
icraggs 53:15b5a280d22d 327
icraggs 53:15b5a280d22d 328 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
icraggs 23:05fc7de97d4a 329 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid()
icraggs 15:64a57183aa03 330 {
icraggs 43:21da1f744243 331 this->command_timeout_ms = command_timeout_ms;
icraggs 53:15b5a280d22d 332 cleansession = true;
icraggs 53:15b5a280d22d 333 closeSession();
icraggs 46:e335fcc1a663 334 }
icraggs 43:21da1f744243 335
icraggs 43:21da1f744243 336
icraggs 43:21da1f744243 337 #if MQTTCLIENT_QOS2
icraggs 43:21da1f744243 338 template<class Network, class Timer, int a, int b>
icraggs 43:21da1f744243 339 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
icraggs 43:21da1f744243 340 {
icraggs 43:21da1f744243 341 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
icraggs 43:21da1f744243 342 {
icraggs 43:21da1f744243 343 if (incomingQoS2messages[i] == id)
icraggs 43:21da1f744243 344 return false;
icraggs 43:21da1f744243 345 }
icraggs 43:21da1f744243 346 return true;
icraggs 11:db15da110a37 347 }
icraggs 11:db15da110a37 348
icraggs 11:db15da110a37 349
icraggs 43:21da1f744243 350 template<class Network, class Timer, int a, int b>
icraggs 43:21da1f744243 351 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
icraggs 43:21da1f744243 352 {
icraggs 43:21da1f744243 353 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
icraggs 43:21da1f744243 354 {
icraggs 43:21da1f744243 355 if (incomingQoS2messages[i] == 0)
icraggs 43:21da1f744243 356 {
icraggs 43:21da1f744243 357 incomingQoS2messages[i] = id;
icraggs 43:21da1f744243 358 return true;
icraggs 43:21da1f744243 359 }
icraggs 43:21da1f744243 360 }
icraggs 43:21da1f744243 361 return false;
icraggs 43:21da1f744243 362 }
icraggs 46:e335fcc1a663 363
icraggs 46:e335fcc1a663 364
icraggs 46:e335fcc1a663 365 template<class Network, class Timer, int a, int b>
icraggs 46:e335fcc1a663 366 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
icraggs 46:e335fcc1a663 367 {
icraggs 46:e335fcc1a663 368 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
icraggs 46:e335fcc1a663 369 {
icraggs 46:e335fcc1a663 370 if (incomingQoS2messages[i] == id)
icraggs 46:e335fcc1a663 371 {
icraggs 46:e335fcc1a663 372 incomingQoS2messages[i] = 0;
icraggs 46:e335fcc1a663 373 return;
icraggs 46:e335fcc1a663 374 }
icraggs 46:e335fcc1a663 375 }
icraggs 46:e335fcc1a663 376 }
icraggs 43:21da1f744243 377 #endif
icraggs 43:21da1f744243 378
icraggs 43:21da1f744243 379
icraggs 43:21da1f744243 380 template<class Network, class Timer, int a, int b>
icraggs 23:05fc7de97d4a 381 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
icraggs 8:c46930bd6c82 382 {
icraggs 43:21da1f744243 383 int rc = FAILURE,
icraggs 23:05fc7de97d4a 384 sent = 0;
icraggs 43:21da1f744243 385
Ian Craggs 57:3513ee54ebb4 386 while (sent < length)
icraggs 23:05fc7de97d4a 387 {
icraggs 46:e335fcc1a663 388 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
icraggs 26:2658bb87c53d 389 if (rc < 0) // there was an error writing the data
icraggs 26:2658bb87c53d 390 break;
icraggs 26:2658bb87c53d 391 sent += rc;
Ian Craggs 57:3513ee54ebb4 392 if (timer.expired()) // only check expiry after at least one attempt to write
Ian Craggs 57:3513ee54ebb4 393 break;
Ian Craggs 57:3513ee54ebb4 394 }
icraggs 20:cad3d54d7ecf 395 if (sent == length)
icraggs 23:05fc7de97d4a 396 {
icraggs 43:21da1f744243 397 if (this->keepAliveInterval > 0)
icraggs 43:21da1f744243 398 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
icraggs 23:05fc7de97d4a 399 rc = SUCCESS;
icraggs 23:05fc7de97d4a 400 }
icraggs 23:05fc7de97d4a 401 else
icraggs 23:05fc7de97d4a 402 rc = FAILURE;
icraggs 53:15b5a280d22d 403
icraggs 43:21da1f744243 404 #if defined(MQTT_DEBUG)
icraggs 46:e335fcc1a663 405 char printbuf[150];
icraggs 56:71ae1a773b64 406 DEBUG("Rc %d from sending packet %s\r\n", rc,
icraggs 54:ff9e5c4b52d0 407 MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
icraggs 43:21da1f744243 408 #endif
icraggs 23:05fc7de97d4a 409 return rc;
icraggs 8:c46930bd6c82 410 }
icraggs 8:c46930bd6c82 411
icraggs 8:c46930bd6c82 412
icraggs 43:21da1f744243 413 template<class Network, class Timer, int a, int b>
icraggs 26:2658bb87c53d 414 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
icraggs 8:c46930bd6c82 415 {
icraggs 36:2f1ada427e56 416 unsigned char c;
icraggs 8:c46930bd6c82 417 int multiplier = 1;
icraggs 8:c46930bd6c82 418 int len = 0;
icraggs 20:cad3d54d7ecf 419 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
icraggs 8:c46930bd6c82 420
icraggs 8:c46930bd6c82 421 *value = 0;
icraggs 8:c46930bd6c82 422 do
icraggs 8:c46930bd6c82 423 {
icraggs 8:c46930bd6c82 424 int rc = MQTTPACKET_READ_ERROR;
icraggs 8:c46930bd6c82 425
icraggs 8:c46930bd6c82 426 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
icraggs 8:c46930bd6c82 427 {
icraggs 8:c46930bd6c82 428 rc = MQTTPACKET_READ_ERROR; /* bad data */
icraggs 8:c46930bd6c82 429 goto exit;
icraggs 8:c46930bd6c82 430 }
icraggs 23:05fc7de97d4a 431 rc = ipstack.read(&c, 1, timeout);
icraggs 8:c46930bd6c82 432 if (rc != 1)
icraggs 8:c46930bd6c82 433 goto exit;
icraggs 8:c46930bd6c82 434 *value += (c & 127) * multiplier;
icraggs 8:c46930bd6c82 435 multiplier *= 128;
icraggs 8:c46930bd6c82 436 } while ((c & 128) != 0);
icraggs 8:c46930bd6c82 437 exit:
icraggs 8:c46930bd6c82 438 return len;
icraggs 8:c46930bd6c82 439 }
icraggs 8:c46930bd6c82 440
icraggs 8:c46930bd6c82 441
icraggs 8:c46930bd6c82 442 /**
icraggs 8:c46930bd6c82 443 * If any read fails in this method, then we should disconnect from the network, as on reconnect
icraggs 43:21da1f744243 444 * the packets can be retried.
icraggs 8:c46930bd6c82 445 * @param timeout the max time to wait for the packet read to complete, in milliseconds
icraggs 53:15b5a280d22d 446 * @return the MQTT packet type, 0 if none, -1 if error
icraggs 8:c46930bd6c82 447 */
icraggs 46:e335fcc1a663 448 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 46:e335fcc1a663 449 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
icraggs 8:c46930bd6c82 450 {
icraggs 26:2658bb87c53d 451 int rc = FAILURE;
icraggs 8:c46930bd6c82 452 MQTTHeader header = {0};
icraggs 8:c46930bd6c82 453 int len = 0;
icraggs 8:c46930bd6c82 454 int rem_len = 0;
icraggs 8:c46930bd6c82 455
icraggs 8:c46930bd6c82 456 /* 1. read the header byte. This has the packet type in it */
icraggs 53:15b5a280d22d 457 rc = ipstack.read(readbuf, 1, timer.left_ms());
icraggs 53:15b5a280d22d 458 if (rc != 1)
icraggs 8:c46930bd6c82 459 goto exit;
icraggs 8:c46930bd6c82 460
icraggs 8:c46930bd6c82 461 len = 1;
icraggs 8:c46930bd6c82 462 /* 2. read the remaining length. This is variable in itself */
icraggs 20:cad3d54d7ecf 463 decodePacket(&rem_len, timer.left_ms());
icraggs 43:21da1f744243 464 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
icraggs 8:c46930bd6c82 465
icraggs 46:e335fcc1a663 466 if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
icraggs 46:e335fcc1a663 467 {
icraggs 46:e335fcc1a663 468 rc = BUFFER_OVERFLOW;
icraggs 46:e335fcc1a663 469 goto exit;
icraggs 46:e335fcc1a663 470 }
icraggs 46:e335fcc1a663 471
icraggs 8:c46930bd6c82 472 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
icraggs 43:21da1f744243 473 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
icraggs 8:c46930bd6c82 474 goto exit;
icraggs 8:c46930bd6c82 475
icraggs 8:c46930bd6c82 476 header.byte = readbuf[0];
icraggs 8:c46930bd6c82 477 rc = header.bits.type;
icraggs 43:21da1f744243 478 if (this->keepAliveInterval > 0)
icraggs 43:21da1f744243 479 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
icraggs 8:c46930bd6c82 480 exit:
icraggs 53:15b5a280d22d 481
icraggs 43:21da1f744243 482 #if defined(MQTT_DEBUG)
icraggs 46:e335fcc1a663 483 if (rc >= 0)
icraggs 46:e335fcc1a663 484 {
icraggs 46:e335fcc1a663 485 char printbuf[50];
icraggs 56:71ae1a773b64 486 DEBUG("Rc %d receiving packet %s\r\n", rc,
icraggs 53:15b5a280d22d 487 MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
icraggs 46:e335fcc1a663 488 }
icraggs 43:21da1f744243 489 #endif
icraggs 8:c46930bd6c82 490 return rc;
icraggs 3:dbff6b768d28 491 }
icraggs 3:dbff6b768d28 492
icraggs 8:c46930bd6c82 493
icraggs 26:2658bb87c53d 494 // assume topic filter and name is in correct format
icraggs 26:2658bb87c53d 495 // # can only be at end
icraggs 26:2658bb87c53d 496 // + and # can only be next to separator
icraggs 43:21da1f744243 497 template<class Network, class Timer, int a, int b>
icraggs 26:2658bb87c53d 498 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
icraggs 26:2658bb87c53d 499 {
icraggs 26:2658bb87c53d 500 char* curf = topicFilter;
icraggs 26:2658bb87c53d 501 char* curn = topicName.lenstring.data;
icraggs 26:2658bb87c53d 502 char* curn_end = curn + topicName.lenstring.len;
icraggs 43:21da1f744243 503
icraggs 26:2658bb87c53d 504 while (*curf && curn < curn_end)
icraggs 26:2658bb87c53d 505 {
icraggs 26:2658bb87c53d 506 if (*curn == '/' && *curf != '/')
icraggs 26:2658bb87c53d 507 break;
icraggs 26:2658bb87c53d 508 if (*curf != '+' && *curf != '#' && *curf != *curn)
icraggs 26:2658bb87c53d 509 break;
icraggs 26:2658bb87c53d 510 if (*curf == '+')
icraggs 26:2658bb87c53d 511 { // skip until we meet the next separator, or end of string
icraggs 26:2658bb87c53d 512 char* nextpos = curn + 1;
icraggs 26:2658bb87c53d 513 while (nextpos < curn_end && *nextpos != '/')
icraggs 26:2658bb87c53d 514 nextpos = ++curn + 1;
icraggs 26:2658bb87c53d 515 }
icraggs 26:2658bb87c53d 516 else if (*curf == '#')
icraggs 26:2658bb87c53d 517 curn = curn_end - 1; // skip until end of string
icraggs 26:2658bb87c53d 518 curf++;
icraggs 26:2658bb87c53d 519 curn++;
icraggs 26:2658bb87c53d 520 };
icraggs 43:21da1f744243 521
icraggs 26:2658bb87c53d 522 return (curn == curn_end) && (*curf == '\0');
icraggs 26:2658bb87c53d 523 }
icraggs 26:2658bb87c53d 524
icraggs 26:2658bb87c53d 525
icraggs 26:2658bb87c53d 526
icraggs 43:21da1f744243 527 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
icraggs 26:2658bb87c53d 528 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
icraggs 15:64a57183aa03 529 {
icraggs 26:2658bb87c53d 530 int rc = FAILURE;
icraggs 20:cad3d54d7ecf 531
icraggs 20:cad3d54d7ecf 532 // we have to find the right message handler - indexed by topic
icraggs 23:05fc7de97d4a 533 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
icraggs 20:cad3d54d7ecf 534 {
icraggs 26:2658bb87c53d 535 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
icraggs 26:2658bb87c53d 536 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
icraggs 20:cad3d54d7ecf 537 {
icraggs 26:2658bb87c53d 538 if (messageHandlers[i].fp.attached())
icraggs 26:2658bb87c53d 539 {
icraggs 31:a51dd239b78e 540 MessageData md(topicName, message);
icraggs 31:a51dd239b78e 541 messageHandlers[i].fp(md);
icraggs 26:2658bb87c53d 542 rc = SUCCESS;
icraggs 26:2658bb87c53d 543 }
icraggs 20:cad3d54d7ecf 544 }
icraggs 20:cad3d54d7ecf 545 }
icraggs 43:21da1f744243 546
icraggs 43:21da1f744243 547 if (rc == FAILURE && defaultMessageHandler.attached())
icraggs 26:2658bb87c53d 548 {
icraggs 31:a51dd239b78e 549 MessageData md(topicName, message);
icraggs 31:a51dd239b78e 550 defaultMessageHandler(md);
icraggs 26:2658bb87c53d 551 rc = SUCCESS;
icraggs 43:21da1f744243 552 }
icraggs 43:21da1f744243 553
icraggs 20:cad3d54d7ecf 554 return rc;
icraggs 15:64a57183aa03 555 }
icraggs 15:64a57183aa03 556
icraggs 15:64a57183aa03 557
icraggs 20:cad3d54d7ecf 558
icraggs 43:21da1f744243 559 template<class Network, class Timer, int a, int b>
icraggs 43:21da1f744243 560 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
icraggs 20:cad3d54d7ecf 561 {
icraggs 26:2658bb87c53d 562 int rc = SUCCESS;
icraggs 53:15b5a280d22d 563 Timer timer;
icraggs 43:21da1f744243 564
icraggs 23:05fc7de97d4a 565 timer.countdown_ms(timeout_ms);
icraggs 20:cad3d54d7ecf 566 while (!timer.expired())
icraggs 26:2658bb87c53d 567 {
icraggs 46:e335fcc1a663 568 if (cycle(timer) < 0)
icraggs 26:2658bb87c53d 569 {
icraggs 26:2658bb87c53d 570 rc = FAILURE;
icraggs 26:2658bb87c53d 571 break;
icraggs 26:2658bb87c53d 572 }
icraggs 26:2658bb87c53d 573 }
icraggs 43:21da1f744243 574
icraggs 26:2658bb87c53d 575 return rc;
icraggs 20:cad3d54d7ecf 576 }
icraggs 20:cad3d54d7ecf 577
icraggs 20:cad3d54d7ecf 578
icraggs 43:21da1f744243 579 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 23:05fc7de97d4a 580 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
icraggs 8:c46930bd6c82 581 {
icraggs 53:15b5a280d22d 582 // get one piece of work off the wire and one pass through
icraggs 26:2658bb87c53d 583 int len = 0,
icraggs 26:2658bb87c53d 584 rc = SUCCESS;
icraggs 28:8b2abe9bd814 585
icraggs 53:15b5a280d22d 586 int packet_type = readPacket(timer); // read the socket, see what work is due
icraggs 53:15b5a280d22d 587
icraggs 8:c46930bd6c82 588 switch (packet_type)
icraggs 8:c46930bd6c82 589 {
icraggs 53:15b5a280d22d 590 default:
icraggs 53:15b5a280d22d 591 // no more data to read, unrecoverable. Or read packet fails due to unexpected network error
icraggs 46:e335fcc1a663 592 rc = packet_type;
icraggs 53:15b5a280d22d 593 goto exit;
icraggs 53:15b5a280d22d 594 case 0: // timed out reading packet
icraggs 46:e335fcc1a663 595 break;
icraggs 15:64a57183aa03 596 case CONNACK:
icraggs 8:c46930bd6c82 597 case PUBACK:
icraggs 8:c46930bd6c82 598 case SUBACK:
icraggs 8:c46930bd6c82 599 break;
icraggs 15:64a57183aa03 600 case PUBLISH:
icraggs 46:e335fcc1a663 601 {
icraggs 46:e335fcc1a663 602 MQTTString topicName = MQTTString_initializer;
icraggs 20:cad3d54d7ecf 603 Message msg;
icraggs 46:e335fcc1a663 604 int intQoS;
icraggs 53:15b5a280d22d 605 msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
icraggs 46:e335fcc1a663 606 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
icraggs 36:2f1ada427e56 607 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
icraggs 26:2658bb87c53d 608 goto exit;
icraggs 46:e335fcc1a663 609 msg.qos = (enum QoS)intQoS;
icraggs 43:21da1f744243 610 #if MQTTCLIENT_QOS2
icraggs 43:21da1f744243 611 if (msg.qos != QOS2)
icraggs 43:21da1f744243 612 #endif
icraggs 31:a51dd239b78e 613 deliverMessage(topicName, msg);
icraggs 43:21da1f744243 614 #if MQTTCLIENT_QOS2
icraggs 31:a51dd239b78e 615 else if (isQoS2msgidFree(msg.id))
icraggs 31:a51dd239b78e 616 {
icraggs 43:21da1f744243 617 if (useQoS2msgid(msg.id))
icraggs 43:21da1f744243 618 deliverMessage(topicName, msg);
icraggs 43:21da1f744243 619 else
icraggs 43:21da1f744243 620 WARN("Maximum number of incoming QoS2 messages exceeded");
icraggs 46:e335fcc1a663 621 }
icraggs 31:a51dd239b78e 622 #endif
icraggs 43:21da1f744243 623 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
icraggs 20:cad3d54d7ecf 624 if (msg.qos != QOS0)
icraggs 20:cad3d54d7ecf 625 {
icraggs 20:cad3d54d7ecf 626 if (msg.qos == QOS1)
icraggs 43:21da1f744243 627 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
icraggs 20:cad3d54d7ecf 628 else if (msg.qos == QOS2)
icraggs 43:21da1f744243 629 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
icraggs 26:2658bb87c53d 630 if (len <= 0)
icraggs 26:2658bb87c53d 631 rc = FAILURE;
icraggs 26:2658bb87c53d 632 else
icraggs 26:2658bb87c53d 633 rc = sendPacket(len, timer);
icraggs 26:2658bb87c53d 634 if (rc == FAILURE)
icraggs 20:cad3d54d7ecf 635 goto exit; // there was a problem
icraggs 20:cad3d54d7ecf 636 }
Ian Craggs 12:cc7f2d62a393 637 break;
icraggs 43:21da1f744243 638 #endif
icraggs 46:e335fcc1a663 639 }
icraggs 43:21da1f744243 640 #if MQTTCLIENT_QOS2
icraggs 15:64a57183aa03 641 case PUBREC:
icraggs 46:e335fcc1a663 642 case PUBREL:
icraggs 36:2f1ada427e56 643 unsigned short mypacketid;
icraggs 36:2f1ada427e56 644 unsigned char dup, type;
icraggs 26:2658bb87c53d 645 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
icraggs 26:2658bb87c53d 646 rc = FAILURE;
icraggs 53:15b5a280d22d 647 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
icraggs 53:15b5a280d22d 648 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
icraggs 26:2658bb87c53d 649 rc = FAILURE;
icraggs 26:2658bb87c53d 650 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
icraggs 26:2658bb87c53d 651 rc = FAILURE; // there was a problem
icraggs 26:2658bb87c53d 652 if (rc == FAILURE)
icraggs 20:cad3d54d7ecf 653 goto exit; // there was a problem
icraggs 46:e335fcc1a663 654 if (packet_type == PUBREL)
icraggs 46:e335fcc1a663 655 freeQoS2msgid(mypacketid);
icraggs 8:c46930bd6c82 656 break;
icraggs 53:15b5a280d22d 657
icraggs 8:c46930bd6c82 658 case PUBCOMP:
icraggs 8:c46930bd6c82 659 break;
icraggs 43:21da1f744243 660 #endif
icraggs 15:64a57183aa03 661 case PINGRESP:
icraggs 20:cad3d54d7ecf 662 ping_outstanding = false;
icraggs 8:c46930bd6c82 663 break;
icraggs 15:64a57183aa03 664 }
icraggs 53:15b5a280d22d 665
icraggs 53:15b5a280d22d 666 if (keepalive() != SUCCESS)
icraggs 53:15b5a280d22d 667 //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
icraggs 53:15b5a280d22d 668 rc = FAILURE;
icraggs 53:15b5a280d22d 669
Ian Craggs 12:cc7f2d62a393 670 exit:
icraggs 26:2658bb87c53d 671 if (rc == SUCCESS)
icraggs 26:2658bb87c53d 672 rc = packet_type;
icraggs 53:15b5a280d22d 673 else if (isconnected)
icraggs 53:15b5a280d22d 674 closeSession();
icraggs 26:2658bb87c53d 675 return rc;
icraggs 15:64a57183aa03 676 }
icraggs 15:64a57183aa03 677
icraggs 15:64a57183aa03 678
icraggs 23:05fc7de97d4a 679 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 23:05fc7de97d4a 680 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
icraggs 15:64a57183aa03 681 {
icraggs 53:15b5a280d22d 682 int rc = SUCCESS;
icraggs 54:ff9e5c4b52d0 683 static Timer ping_sent;
icraggs 15:64a57183aa03 684
icraggs 20:cad3d54d7ecf 685 if (keepAliveInterval == 0)
icraggs 20:cad3d54d7ecf 686 goto exit;
icraggs 54:ff9e5c4b52d0 687
icraggs 54:ff9e5c4b52d0 688 if (ping_outstanding)
icraggs 20:cad3d54d7ecf 689 {
icraggs 54:ff9e5c4b52d0 690 if (ping_sent.expired())
icraggs 53:15b5a280d22d 691 {
icraggs 53:15b5a280d22d 692 rc = FAILURE; // session failure
icraggs 53:15b5a280d22d 693 #if defined(MQTT_DEBUG)
icraggs 56:71ae1a773b64 694 DEBUG("PINGRESP not received in keepalive interval\r\n");
icraggs 53:15b5a280d22d 695 #endif
icraggs 53:15b5a280d22d 696 }
icraggs 54:ff9e5c4b52d0 697 }
icraggs 54:ff9e5c4b52d0 698 else if (last_sent.expired() || last_received.expired())
icraggs 54:ff9e5c4b52d0 699 {
icraggs 54:ff9e5c4b52d0 700 Timer timer(1000);
icraggs 54:ff9e5c4b52d0 701 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
icraggs 54:ff9e5c4b52d0 702 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
icraggs 20:cad3d54d7ecf 703 {
icraggs 54:ff9e5c4b52d0 704 ping_outstanding = true;
icraggs 54:ff9e5c4b52d0 705 ping_sent.countdown(this->keepAliveInterval);
icraggs 20:cad3d54d7ecf 706 }
icraggs 20:cad3d54d7ecf 707 }
icraggs 15:64a57183aa03 708 exit:
icraggs 20:cad3d54d7ecf 709 return rc;
icraggs 8:c46930bd6c82 710 }
icraggs 8:c46930bd6c82 711
icraggs 8:c46930bd6c82 712
icraggs 16:91c2f9a144d4 713 // only used in single-threaded mode where one command at a time is in process
icraggs 43:21da1f744243 714 template<class Network, class Timer, int a, int b>
icraggs 23:05fc7de97d4a 715 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
icraggs 15:64a57183aa03 716 {
icraggs 26:2658bb87c53d 717 int rc = FAILURE;
icraggs 43:21da1f744243 718
icraggs 20:cad3d54d7ecf 719 do
icraggs 16:91c2f9a144d4 720 {
icraggs 43:21da1f744243 721 if (timer.expired())
icraggs 20:cad3d54d7ecf 722 break; // we timed out
icraggs 53:15b5a280d22d 723 rc = cycle(timer);
icraggs 20:cad3d54d7ecf 724 }
icraggs 53:15b5a280d22d 725 while (rc != packet_type && rc >= 0);
icraggs 43:21da1f744243 726
icraggs 20:cad3d54d7ecf 727 return rc;
icraggs 16:91c2f9a144d4 728 }
icraggs 16:91c2f9a144d4 729
icraggs 16:91c2f9a144d4 730
icraggs 43:21da1f744243 731 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 53:15b5a280d22d 732 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data)
icraggs 16:91c2f9a144d4 733 {
icraggs 46:e335fcc1a663 734 Timer connect_timer(command_timeout_ms);
icraggs 26:2658bb87c53d 735 int rc = FAILURE;
icraggs 31:a51dd239b78e 736 int len = 0;
icraggs 43:21da1f744243 737
icraggs 31:a51dd239b78e 738 if (isconnected) // don't send connect packet again if we are already connected
icraggs 31:a51dd239b78e 739 goto exit;
icraggs 8:c46930bd6c82 740
icraggs 43:21da1f744243 741 this->keepAliveInterval = options.keepAliveInterval;
icraggs 43:21da1f744243 742 this->cleansession = options.cleansession;
icraggs 43:21da1f744243 743 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
icraggs 26:2658bb87c53d 744 goto exit;
icraggs 26:2658bb87c53d 745 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet
icraggs 20:cad3d54d7ecf 746 goto exit; // there was a problem
icraggs 43:21da1f744243 747
icraggs 43:21da1f744243 748 if (this->keepAliveInterval > 0)
icraggs 43:21da1f744243 749 last_received.countdown(this->keepAliveInterval);
icraggs 20:cad3d54d7ecf 750 // this will be a blocking call, wait for the connack
icraggs 20:cad3d54d7ecf 751 if (waitfor(CONNACK, connect_timer) == CONNACK)
icraggs 15:64a57183aa03 752 {
icraggs 53:15b5a280d22d 753 data.rc = 0;
icraggs 53:15b5a280d22d 754 data.sessionPresent = false;
icraggs 53:15b5a280d22d 755 if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent,
icraggs 53:15b5a280d22d 756 (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
icraggs 53:15b5a280d22d 757 rc = data.rc;
icraggs 26:2658bb87c53d 758 else
icraggs 26:2658bb87c53d 759 rc = FAILURE;
icraggs 8:c46930bd6c82 760 }
icraggs 26:2658bb87c53d 761 else
icraggs 26:2658bb87c53d 762 rc = FAILURE;
icraggs 46:e335fcc1a663 763
icraggs 43:21da1f744243 764 #if MQTTCLIENT_QOS2
icraggs 46:e335fcc1a663 765 // resend any inflight publish
icraggs 46:e335fcc1a663 766 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
icraggs 43:21da1f744243 767 {
icraggs 43:21da1f744243 768 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
icraggs 43:21da1f744243 769 rc = FAILURE;
icraggs 43:21da1f744243 770 else
icraggs 43:21da1f744243 771 rc = publish(len, connect_timer, inflightQoS);
icraggs 43:21da1f744243 772 }
icraggs 43:21da1f744243 773 else
icraggs 43:21da1f744243 774 #endif
icraggs 43:21da1f744243 775 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
icraggs 43:21da1f744243 776 if (inflightMsgid > 0)
icraggs 43:21da1f744243 777 {
icraggs 43:21da1f744243 778 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
icraggs 43:21da1f744243 779 rc = publish(inflightLen, connect_timer, inflightQoS);
icraggs 43:21da1f744243 780 }
icraggs 43:21da1f744243 781 #endif
icraggs 43:21da1f744243 782
icraggs 15:64a57183aa03 783 exit:
icraggs 26:2658bb87c53d 784 if (rc == SUCCESS)
icraggs 53:15b5a280d22d 785 {
icraggs 26:2658bb87c53d 786 isconnected = true;
icraggs 53:15b5a280d22d 787 ping_outstanding = false;
icraggs 53:15b5a280d22d 788 }
icraggs 8:c46930bd6c82 789 return rc;
icraggs 8:c46930bd6c82 790 }
icraggs 8:c46930bd6c82 791
icraggs 8:c46930bd6c82 792
icraggs 43:21da1f744243 793 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 53:15b5a280d22d 794 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
icraggs 53:15b5a280d22d 795 {
icraggs 53:15b5a280d22d 796 connackData data;
icraggs 53:15b5a280d22d 797 return connect(options, data);
icraggs 53:15b5a280d22d 798 }
icraggs 53:15b5a280d22d 799
icraggs 53:15b5a280d22d 800
icraggs 53:15b5a280d22d 801 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 43:21da1f744243 802 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
icraggs 43:21da1f744243 803 {
icraggs 43:21da1f744243 804 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
icraggs 43:21da1f744243 805 return connect(default_options);
icraggs 43:21da1f744243 806 }
icraggs 43:21da1f744243 807
icraggs 43:21da1f744243 808
icraggs 43:21da1f744243 809 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
icraggs 53:15b5a280d22d 810 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler)
icraggs 53:15b5a280d22d 811 {
icraggs 53:15b5a280d22d 812 int rc = FAILURE;
icraggs 53:15b5a280d22d 813 int i = -1;
icraggs 53:15b5a280d22d 814
icraggs 53:15b5a280d22d 815 // first check for an existing matching slot
icraggs 53:15b5a280d22d 816 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
icraggs 53:15b5a280d22d 817 {
icraggs 53:15b5a280d22d 818 if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
icraggs 53:15b5a280d22d 819 {
icraggs 53:15b5a280d22d 820 if (messageHandler == 0) // remove existing
icraggs 53:15b5a280d22d 821 {
icraggs 53:15b5a280d22d 822 messageHandlers[i].topicFilter = 0;
icraggs 53:15b5a280d22d 823 messageHandlers[i].fp.detach();
icraggs 53:15b5a280d22d 824 }
icraggs 53:15b5a280d22d 825 rc = SUCCESS; // return i when adding new subscription
icraggs 53:15b5a280d22d 826 break;
icraggs 53:15b5a280d22d 827 }
icraggs 53:15b5a280d22d 828 }
icraggs 53:15b5a280d22d 829 // if no existing, look for empty slot (unless we are removing)
icraggs 53:15b5a280d22d 830 if (messageHandler != 0) {
icraggs 53:15b5a280d22d 831 if (rc == FAILURE)
icraggs 53:15b5a280d22d 832 {
icraggs 53:15b5a280d22d 833 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
icraggs 53:15b5a280d22d 834 {
icraggs 53:15b5a280d22d 835 if (messageHandlers[i].topicFilter == 0)
icraggs 53:15b5a280d22d 836 {
icraggs 53:15b5a280d22d 837 rc = SUCCESS;
icraggs 53:15b5a280d22d 838 break;
icraggs 53:15b5a280d22d 839 }
icraggs 53:15b5a280d22d 840 }
icraggs 53:15b5a280d22d 841 }
icraggs 53:15b5a280d22d 842 if (i < MAX_MESSAGE_HANDLERS)
icraggs 53:15b5a280d22d 843 {
icraggs 53:15b5a280d22d 844 messageHandlers[i].topicFilter = topicFilter;
icraggs 53:15b5a280d22d 845 messageHandlers[i].fp.attach(messageHandler);
icraggs 53:15b5a280d22d 846 }
icraggs 53:15b5a280d22d 847 }
icraggs 53:15b5a280d22d 848 return rc;
icraggs 53:15b5a280d22d 849 }
icraggs 53:15b5a280d22d 850
icraggs 53:15b5a280d22d 851
icraggs 53:15b5a280d22d 852 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
icraggs 53:15b5a280d22d 853 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter,
icraggs 53:15b5a280d22d 854 enum QoS qos, messageHandler messageHandler, subackData& data)
icraggs 43:21da1f744243 855 {
icraggs 43:21da1f744243 856 int rc = FAILURE;
icraggs 46:e335fcc1a663 857 Timer timer(command_timeout_ms);
icraggs 26:2658bb87c53d 858 int len = 0;
icraggs 46:e335fcc1a663 859 MQTTString topic = {(char*)topicFilter, {0, 0}};
icraggs 43:21da1f744243 860
icraggs 26:2658bb87c53d 861 if (!isconnected)
icraggs 20:cad3d54d7ecf 862 goto exit;
icraggs 43:21da1f744243 863
icraggs 43:21da1f744243 864 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
icraggs 26:2658bb87c53d 865 if (len <= 0)
icraggs 26:2658bb87c53d 866 goto exit;
icraggs 23:05fc7de97d4a 867 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
icraggs 26:2658bb87c53d 868 goto exit; // there was a problem
icraggs 43:21da1f744243 869
icraggs 43:21da1f744243 870 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback
icraggs 8:c46930bd6c82 871 {
icraggs 53:15b5a280d22d 872 int count = 0;
icraggs 36:2f1ada427e56 873 unsigned short mypacketid;
icraggs 53:15b5a280d22d 874 data.grantedQoS = 0;
icraggs 53:15b5a280d22d 875 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
icraggs 8:c46930bd6c82 876 {
icraggs 53:15b5a280d22d 877 if (data.grantedQoS != 0x80)
icraggs 53:15b5a280d22d 878 rc = setMessageHandler(topicFilter, messageHandler);
icraggs 8:c46930bd6c82 879 }
icraggs 8:c46930bd6c82 880 }
icraggs 43:21da1f744243 881 else
icraggs 26:2658bb87c53d 882 rc = FAILURE;
icraggs 43:21da1f744243 883
icraggs 15:64a57183aa03 884 exit:
icraggs 53:15b5a280d22d 885 if (rc == FAILURE)
icraggs 53:15b5a280d22d 886 closeSession();
Ian Craggs 12:cc7f2d62a393 887 return rc;
icraggs 15:64a57183aa03 888 }
icraggs 15:64a57183aa03 889
icraggs 15:64a57183aa03 890
icraggs 43:21da1f744243 891 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
icraggs 53:15b5a280d22d 892 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
icraggs 53:15b5a280d22d 893 {
icraggs 53:15b5a280d22d 894 subackData data;
icraggs 53:15b5a280d22d 895 return subscribe(topicFilter, qos, messageHandler, data);
icraggs 53:15b5a280d22d 896 }
icraggs 53:15b5a280d22d 897
icraggs 53:15b5a280d22d 898
icraggs 53:15b5a280d22d 899 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
icraggs 23:05fc7de97d4a 900 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
icraggs 43:21da1f744243 901 {
icraggs 26:2658bb87c53d 902 int rc = FAILURE;
icraggs 46:e335fcc1a663 903 Timer timer(command_timeout_ms);
icraggs 46:e335fcc1a663 904 MQTTString topic = {(char*)topicFilter, {0, 0}};
icraggs 31:a51dd239b78e 905 int len = 0;
icraggs 43:21da1f744243 906
icraggs 31:a51dd239b78e 907 if (!isconnected)
icraggs 31:a51dd239b78e 908 goto exit;
icraggs 43:21da1f744243 909
icraggs 43:21da1f744243 910 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
icraggs 20:cad3d54d7ecf 911 goto exit;
icraggs 43:21da1f744243 912 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
icraggs 20:cad3d54d7ecf 913 goto exit; // there was a problem
icraggs 43:21da1f744243 914
icraggs 20:cad3d54d7ecf 915 if (waitfor(UNSUBACK, timer) == UNSUBACK)
Ian Craggs 12:cc7f2d62a393 916 {
icraggs 36:2f1ada427e56 917 unsigned short mypacketid; // should be the same as the packetid above
icraggs 23:05fc7de97d4a 918 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
icraggs 46:e335fcc1a663 919 {
icraggs 46:e335fcc1a663 920 // remove the subscription message handler associated with this topic, if there is one
icraggs 53:15b5a280d22d 921 setMessageHandler(topicFilter, 0);
icraggs 46:e335fcc1a663 922 }
Ian Craggs 12:cc7f2d62a393 923 }
icraggs 26:2658bb87c53d 924 else
icraggs 26:2658bb87c53d 925 rc = FAILURE;
icraggs 43:21da1f744243 926
icraggs 15:64a57183aa03 927 exit:
icraggs 43:21da1f744243 928 if (rc != SUCCESS)
icraggs 53:15b5a280d22d 929 closeSession();
Ian Craggs 12:cc7f2d62a393 930 return rc;
icraggs 15:64a57183aa03 931 }
icraggs 15:64a57183aa03 932
icraggs 15:64a57183aa03 933
icraggs 43:21da1f744243 934 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 43:21da1f744243 935 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
icraggs 20:cad3d54d7ecf 936 {
icraggs 43:21da1f744243 937 int rc;
icraggs 46:e335fcc1a663 938
icraggs 43:21da1f744243 939 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
icraggs 43:21da1f744243 940 goto exit; // there was a problem
icraggs 15:64a57183aa03 941
icraggs 46:e335fcc1a663 942 #if MQTTCLIENT_QOS1
icraggs 43:21da1f744243 943 if (qos == QOS1)
Ian Craggs 12:cc7f2d62a393 944 {
icraggs 20:cad3d54d7ecf 945 if (waitfor(PUBACK, timer) == PUBACK)
icraggs 20:cad3d54d7ecf 946 {
icraggs 36:2f1ada427e56 947 unsigned short mypacketid;
icraggs 36:2f1ada427e56 948 unsigned char dup, type;
icraggs 26:2658bb87c53d 949 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
icraggs 26:2658bb87c53d 950 rc = FAILURE;
icraggs 43:21da1f744243 951 else if (inflightMsgid == mypacketid)
icraggs 43:21da1f744243 952 inflightMsgid = 0;
icraggs 20:cad3d54d7ecf 953 }
icraggs 26:2658bb87c53d 954 else
icraggs 26:2658bb87c53d 955 rc = FAILURE;
Ian Craggs 12:cc7f2d62a393 956 }
icraggs 53:15b5a280d22d 957 #endif
icraggs 53:15b5a280d22d 958 #if MQTTCLIENT_QOS2
icraggs 43:21da1f744243 959 else if (qos == QOS2)
Ian Craggs 12:cc7f2d62a393 960 {
icraggs 20:cad3d54d7ecf 961 if (waitfor(PUBCOMP, timer) == PUBCOMP)
icraggs 20:cad3d54d7ecf 962 {
icraggs 36:2f1ada427e56 963 unsigned short mypacketid;
icraggs 36:2f1ada427e56 964 unsigned char dup, type;
icraggs 26:2658bb87c53d 965 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
icraggs 26:2658bb87c53d 966 rc = FAILURE;
icraggs 43:21da1f744243 967 else if (inflightMsgid == mypacketid)
icraggs 43:21da1f744243 968 inflightMsgid = 0;
icraggs 20:cad3d54d7ecf 969 }
icraggs 26:2658bb87c53d 970 else
icraggs 26:2658bb87c53d 971 rc = FAILURE;
Ian Craggs 12:cc7f2d62a393 972 }
icraggs 43:21da1f744243 973 #endif
icraggs 43:21da1f744243 974
icraggs 43:21da1f744243 975 exit:
icraggs 43:21da1f744243 976 if (rc != SUCCESS)
icraggs 53:15b5a280d22d 977 closeSession();
icraggs 43:21da1f744243 978 return rc;
icraggs 43:21da1f744243 979 }
icraggs 43:21da1f744243 980
icraggs 43:21da1f744243 981
icraggs 43:21da1f744243 982
icraggs 43:21da1f744243 983 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 43:21da1f744243 984 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
icraggs 43:21da1f744243 985 {
icraggs 43:21da1f744243 986 int rc = FAILURE;
icraggs 46:e335fcc1a663 987 Timer timer(command_timeout_ms);
icraggs 43:21da1f744243 988 MQTTString topicString = MQTTString_initializer;
icraggs 43:21da1f744243 989 int len = 0;
icraggs 43:21da1f744243 990
icraggs 43:21da1f744243 991 if (!isconnected)
icraggs 43:21da1f744243 992 goto exit;
icraggs 46:e335fcc1a663 993
icraggs 43:21da1f744243 994 topicString.cstring = (char*)topicName;
icraggs 43:21da1f744243 995
icraggs 43:21da1f744243 996 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
icraggs 43:21da1f744243 997 if (qos == QOS1 || qos == QOS2)
icraggs 43:21da1f744243 998 id = packetid.getNext();
icraggs 43:21da1f744243 999 #endif
icraggs 43:21da1f744243 1000
icraggs 43:21da1f744243 1001 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
icraggs 43:21da1f744243 1002 topicString, (unsigned char*)payload, payloadlen);
icraggs 43:21da1f744243 1003 if (len <= 0)
icraggs 43:21da1f744243 1004 goto exit;
icraggs 46:e335fcc1a663 1005
icraggs 43:21da1f744243 1006 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
icraggs 43:21da1f744243 1007 if (!cleansession)
icraggs 43:21da1f744243 1008 {
icraggs 43:21da1f744243 1009 memcpy(pubbuf, sendbuf, len);
icraggs 43:21da1f744243 1010 inflightMsgid = id;
icraggs 43:21da1f744243 1011 inflightLen = len;
icraggs 43:21da1f744243 1012 inflightQoS = qos;
icraggs 43:21da1f744243 1013 #if MQTTCLIENT_QOS2
icraggs 43:21da1f744243 1014 pubrel = false;
icraggs 43:21da1f744243 1015 #endif
icraggs 43:21da1f744243 1016 }
icraggs 43:21da1f744243 1017 #endif
icraggs 46:e335fcc1a663 1018
icraggs 43:21da1f744243 1019 rc = publish(len, timer, qos);
icraggs 15:64a57183aa03 1020 exit:
icraggs 8:c46930bd6c82 1021 return rc;
icraggs 8:c46930bd6c82 1022 }
icraggs 8:c46930bd6c82 1023
icraggs 8:c46930bd6c82 1024
icraggs 43:21da1f744243 1025 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 43:21da1f744243 1026 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
icraggs 43:21da1f744243 1027 {
icraggs 43:21da1f744243 1028 unsigned short id = 0; // dummy - not used for anything
icraggs 43:21da1f744243 1029 return publish(topicName, payload, payloadlen, id, qos, retained);
icraggs 43:21da1f744243 1030 }
icraggs 43:21da1f744243 1031
icraggs 43:21da1f744243 1032
icraggs 43:21da1f744243 1033 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 43:21da1f744243 1034 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
icraggs 43:21da1f744243 1035 {
icraggs 43:21da1f744243 1036 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
icraggs 43:21da1f744243 1037 }
icraggs 43:21da1f744243 1038
icraggs 43:21da1f744243 1039
icraggs 43:21da1f744243 1040 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 23:05fc7de97d4a 1041 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
icraggs 43:21da1f744243 1042 {
icraggs 26:2658bb87c53d 1043 int rc = FAILURE;
icraggs 53:15b5a280d22d 1044 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
icraggs 44:c299463ae853 1045 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
icraggs 26:2658bb87c53d 1046 if (len > 0)
icraggs 26:2658bb87c53d 1047 rc = sendPacket(len, timer); // send the disconnect packet
icraggs 53:15b5a280d22d 1048 closeSession();
icraggs 23:05fc7de97d4a 1049 return rc;
icraggs 20:cad3d54d7ecf 1050 }
icraggs 20:cad3d54d7ecf 1051
icraggs 53:15b5a280d22d 1052 #endif