MQTTClient

Dependents:   IoTGateway_Basic test

Committer:
SomeRandomBloke
Date:
Tue Mar 20 20:53:22 2012 +0000
Revision:
0:260fb10c0755
Child:
1:a50b6c866a3b

        

Who changed what in which revision?

UserRevisionLine numberNew contents of line
SomeRandomBloke 0:260fb10c0755 1 /*
SomeRandomBloke 0:260fb10c0755 2 MQTTClient.cpp
SomeRandomBloke 0:260fb10c0755 3 Based on MQTTClient from http://ceit.uq.edu.au/content/mqttclient-mbed-version-20
SomeRandomBloke 0:260fb10c0755 4 A simple MQTT client for mbed, version 2.0
SomeRandomBloke 0:260fb10c0755 5 By Yilun FAN, @CEIT, @JAN 2011
SomeRandomBloke 0:260fb10c0755 6
SomeRandomBloke 0:260fb10c0755 7 Bug fixes and additions by Andrew Lindsay (andrew [at] thiseldo [dot] co [dot] uk)
SomeRandomBloke 0:260fb10c0755 8
SomeRandomBloke 0:260fb10c0755 9 Permission is hereby granted, free of charge, to any person obtaining a copy
SomeRandomBloke 0:260fb10c0755 10 of this software and associated documentation files (the "Software"), to deal
SomeRandomBloke 0:260fb10c0755 11 in the Software without restriction, including without limitation the rights
SomeRandomBloke 0:260fb10c0755 12 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
SomeRandomBloke 0:260fb10c0755 13 copies of the Software, and to permit persons to whom the Software is
SomeRandomBloke 0:260fb10c0755 14 furnished to do so, subject to the following conditions:
SomeRandomBloke 0:260fb10c0755 15
SomeRandomBloke 0:260fb10c0755 16 The above copyright notice and this permission notice shall be included in
SomeRandomBloke 0:260fb10c0755 17 all copies or substantial portions of the Software.
SomeRandomBloke 0:260fb10c0755 18
SomeRandomBloke 0:260fb10c0755 19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
SomeRandomBloke 0:260fb10c0755 20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
SomeRandomBloke 0:260fb10c0755 21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
SomeRandomBloke 0:260fb10c0755 22 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
SomeRandomBloke 0:260fb10c0755 23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
SomeRandomBloke 0:260fb10c0755 24 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
SomeRandomBloke 0:260fb10c0755 25 THE SOFTWARE.
SomeRandomBloke 0:260fb10c0755 26 */
SomeRandomBloke 0:260fb10c0755 27
SomeRandomBloke 0:260fb10c0755 28 #include "MQTTClient.h"
SomeRandomBloke 0:260fb10c0755 29
SomeRandomBloke 0:260fb10c0755 30 MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*)) {
SomeRandomBloke 0:260fb10c0755 31 this->port = port;
SomeRandomBloke 0:260fb10c0755 32 callback_server = callback;
SomeRandomBloke 0:260fb10c0755 33 serverIp = server;
SomeRandomBloke 0:260fb10c0755 34 connected = false;
SomeRandomBloke 0:260fb10c0755 35 sessionOpened = false;
SomeRandomBloke 0:260fb10c0755 36 timer.start();
SomeRandomBloke 0:260fb10c0755 37 }
SomeRandomBloke 0:260fb10c0755 38
SomeRandomBloke 0:260fb10c0755 39 MQTTClient::MQTTClient() {}
SomeRandomBloke 0:260fb10c0755 40
SomeRandomBloke 0:260fb10c0755 41 MQTTClient::~MQTTClient() {}
SomeRandomBloke 0:260fb10c0755 42
SomeRandomBloke 0:260fb10c0755 43
SomeRandomBloke 0:260fb10c0755 44 void MQTTClient::init(IpAddr *server, int port, void (*callback)(char*, char*)) {
SomeRandomBloke 0:260fb10c0755 45 this->port = port;
SomeRandomBloke 0:260fb10c0755 46 callback_server = callback;
SomeRandomBloke 0:260fb10c0755 47 serverIp = *server;
SomeRandomBloke 0:260fb10c0755 48 this->userName = NULL;
SomeRandomBloke 0:260fb10c0755 49 this->password = NULL;
SomeRandomBloke 0:260fb10c0755 50 connected = false;
SomeRandomBloke 0:260fb10c0755 51 sessionOpened = false;
SomeRandomBloke 0:260fb10c0755 52 timer.start();
SomeRandomBloke 0:260fb10c0755 53 }
SomeRandomBloke 0:260fb10c0755 54
SomeRandomBloke 0:260fb10c0755 55 void MQTTClient::init(IpAddr *server, int port, char *userName, char *password, void (*callback)(char*, char*)) {
SomeRandomBloke 0:260fb10c0755 56 this->port = port;
SomeRandomBloke 0:260fb10c0755 57 callback_server = callback;
SomeRandomBloke 0:260fb10c0755 58 serverIp = *server;
SomeRandomBloke 0:260fb10c0755 59 this->userName = userName;
SomeRandomBloke 0:260fb10c0755 60 this->password = password;
SomeRandomBloke 0:260fb10c0755 61 connected = false;
SomeRandomBloke 0:260fb10c0755 62 sessionOpened = false;
SomeRandomBloke 0:260fb10c0755 63 timer.start();
SomeRandomBloke 0:260fb10c0755 64 }
SomeRandomBloke 0:260fb10c0755 65
SomeRandomBloke 0:260fb10c0755 66 int MQTTClient::send_data(const char* msg, int size) {
SomeRandomBloke 0:260fb10c0755 67 int transLen = pTCPSocket->send(msg, size);
SomeRandomBloke 0:260fb10c0755 68
SomeRandomBloke 0:260fb10c0755 69 /*Check send length matches the message length*/
SomeRandomBloke 0:260fb10c0755 70 if (transLen != size) {
SomeRandomBloke 0:260fb10c0755 71 for (int i = 0; i < size; i++) {
SomeRandomBloke 0:260fb10c0755 72 printf("%x, ", msg[i]);
SomeRandomBloke 0:260fb10c0755 73 }
SomeRandomBloke 0:260fb10c0755 74 printf("Error on send.\r\n");
SomeRandomBloke 0:260fb10c0755 75 return -1;
SomeRandomBloke 0:260fb10c0755 76 }
SomeRandomBloke 0:260fb10c0755 77 return 1;
SomeRandomBloke 0:260fb10c0755 78 }
SomeRandomBloke 0:260fb10c0755 79
SomeRandomBloke 0:260fb10c0755 80 int MQTTClient::open_session(char* id) {
SomeRandomBloke 0:260fb10c0755 81 /*variable header*/
SomeRandomBloke 0:260fb10c0755 82 char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)};
SomeRandomBloke 0:260fb10c0755 83
SomeRandomBloke 0:260fb10c0755 84 /*fixed header: 2 bytes, big endian*/
SomeRandomBloke 0:260fb10c0755 85 char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2};
SomeRandomBloke 0:260fb10c0755 86 short usernameLen = strlen( userName );
SomeRandomBloke 0:260fb10c0755 87 short passwordLen = strlen( password );
SomeRandomBloke 0:260fb10c0755 88 var_header[9] |= (usernameLen > 0 ? 0x80 : 0x00 );
SomeRandomBloke 0:260fb10c0755 89 var_header[9] |= (passwordLen > 0 ? 0x40 : 0x00 );
SomeRandomBloke 0:260fb10c0755 90
SomeRandomBloke 0:260fb10c0755 91 // printf("fixed %d, var %d id %d, username %d, password %d\n",sizeof(fixed_header), sizeof(var_header), sizeof(id), usernameLen, passwordLen );
SomeRandomBloke 0:260fb10c0755 92 char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id) + usernameLen + (usernameLen > 0 ? 2 : 0) + passwordLen + (passwordLen > 0 ? 2 : 0) ];
SomeRandomBloke 0:260fb10c0755 93
SomeRandomBloke 0:260fb10c0755 94 memset(packet,0,sizeof(packet));
SomeRandomBloke 0:260fb10c0755 95 memcpy(packet,fixed_header,sizeof(fixed_header));
SomeRandomBloke 0:260fb10c0755 96 memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header));
SomeRandomBloke 0:260fb10c0755 97 memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id));
SomeRandomBloke 0:260fb10c0755 98 if ( usernameLen > 0 ) {
SomeRandomBloke 0:260fb10c0755 99 packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)] = 0x00;
SomeRandomBloke 0:260fb10c0755 100 packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)+1] = usernameLen;
SomeRandomBloke 0:260fb10c0755 101 memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+2, userName, usernameLen);
SomeRandomBloke 0:260fb10c0755 102 packet[1] += (usernameLen + 2);
SomeRandomBloke 0:260fb10c0755 103 }
SomeRandomBloke 0:260fb10c0755 104 if ( passwordLen > 0 ) {
SomeRandomBloke 0:260fb10c0755 105 packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0) ] = 0x00;
SomeRandomBloke 0:260fb10c0755 106 packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0)+1] = passwordLen;
SomeRandomBloke 0:260fb10c0755 107 memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+ usernameLen + (usernameLen > 0 ? 2 : 0)+2, password, passwordLen);
SomeRandomBloke 0:260fb10c0755 108 packet[1] += (passwordLen + 2);
SomeRandomBloke 0:260fb10c0755 109 }
SomeRandomBloke 0:260fb10c0755 110
SomeRandomBloke 0:260fb10c0755 111 /* printf("Packet length %d\n", sizeof(packet));
SomeRandomBloke 0:260fb10c0755 112 for (int i = 0; i < sizeof(packet); i++) {
SomeRandomBloke 0:260fb10c0755 113 printf("%x, ", packet[i]);
SomeRandomBloke 0:260fb10c0755 114 }
SomeRandomBloke 0:260fb10c0755 115 printf("\r\n");
SomeRandomBloke 0:260fb10c0755 116 */
SomeRandomBloke 0:260fb10c0755 117 if (!send_data(packet, sizeof(packet))) {
SomeRandomBloke 0:260fb10c0755 118 return -1;
SomeRandomBloke 0:260fb10c0755 119 }
SomeRandomBloke 0:260fb10c0755 120 // printf("Sent\n");
SomeRandomBloke 0:260fb10c0755 121 return 1;
SomeRandomBloke 0:260fb10c0755 122 }
SomeRandomBloke 0:260fb10c0755 123
SomeRandomBloke 0:260fb10c0755 124 int MQTTClient::connect(char* id) {
SomeRandomBloke 0:260fb10c0755 125 clientId = id;
SomeRandomBloke 0:260fb10c0755 126
SomeRandomBloke 0:260fb10c0755 127 /*Initial TCP socket*/
SomeRandomBloke 0:260fb10c0755 128 pTCPSocket = new TCPSocket;
SomeRandomBloke 0:260fb10c0755 129 pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent);
SomeRandomBloke 0:260fb10c0755 130
SomeRandomBloke 0:260fb10c0755 131 host.setPort(port);
SomeRandomBloke 0:260fb10c0755 132 host.setIp(serverIp);
SomeRandomBloke 0:260fb10c0755 133 host.setName("localhost");
SomeRandomBloke 0:260fb10c0755 134
SomeRandomBloke 0:260fb10c0755 135 /*Trying to connect to host*/
SomeRandomBloke 0:260fb10c0755 136 printf("Trying to connect to host..\r\n\r\n");
SomeRandomBloke 0:260fb10c0755 137 TCPSocketErr err = pTCPSocket->connect(host);
SomeRandomBloke 0:260fb10c0755 138
SomeRandomBloke 0:260fb10c0755 139 Net::poll();
SomeRandomBloke 0:260fb10c0755 140 if (err) {
SomeRandomBloke 0:260fb10c0755 141 printf("Error connecting to host [%d]\r\n", (int) err);
SomeRandomBloke 0:260fb10c0755 142 return -1;
SomeRandomBloke 0:260fb10c0755 143 }
SomeRandomBloke 0:260fb10c0755 144 printf("Connect to host sucessed..\r\n\r\n");
SomeRandomBloke 0:260fb10c0755 145
SomeRandomBloke 0:260fb10c0755 146 /*Wait TCP connection with server to be established*/
SomeRandomBloke 0:260fb10c0755 147 int i = 0;
SomeRandomBloke 0:260fb10c0755 148 while (!connected) {
SomeRandomBloke 0:260fb10c0755 149 Net::poll();
SomeRandomBloke 0:260fb10c0755 150 wait(1);
SomeRandomBloke 0:260fb10c0755 151 i++;
SomeRandomBloke 0:260fb10c0755 152 printf("Wait for connections %d..\r\n", i);
SomeRandomBloke 0:260fb10c0755 153 if (i == 35) {//If wait too long, give up.
SomeRandomBloke 0:260fb10c0755 154 return -1;
SomeRandomBloke 0:260fb10c0755 155 }
SomeRandomBloke 0:260fb10c0755 156 }
SomeRandomBloke 0:260fb10c0755 157
SomeRandomBloke 0:260fb10c0755 158 /*Send open session message to server*/
SomeRandomBloke 0:260fb10c0755 159 open_session(id);
SomeRandomBloke 0:260fb10c0755 160
SomeRandomBloke 0:260fb10c0755 161 /*Wait server notice of open sesion*/
SomeRandomBloke 0:260fb10c0755 162 while (!sessionOpened) {
SomeRandomBloke 0:260fb10c0755 163 Net::poll();
SomeRandomBloke 0:260fb10c0755 164 wait(1);
SomeRandomBloke 0:260fb10c0755 165 if (!connected) {
SomeRandomBloke 0:260fb10c0755 166 break;
SomeRandomBloke 0:260fb10c0755 167 }
SomeRandomBloke 0:260fb10c0755 168 printf("Wait for session..\r\n");
SomeRandomBloke 0:260fb10c0755 169 }
SomeRandomBloke 0:260fb10c0755 170 if (!connected) {
SomeRandomBloke 0:260fb10c0755 171 return -2;
SomeRandomBloke 0:260fb10c0755 172 }
SomeRandomBloke 0:260fb10c0755 173 lastActivity = timer.read_ms();
SomeRandomBloke 0:260fb10c0755 174 return 1;
SomeRandomBloke 0:260fb10c0755 175 }
SomeRandomBloke 0:260fb10c0755 176
SomeRandomBloke 0:260fb10c0755 177 int MQTTClient::publish(char* pub_topic, char* msg) {
SomeRandomBloke 0:260fb10c0755 178 uint8_t var_header_pub[strlen(pub_topic)+3];
SomeRandomBloke 0:260fb10c0755 179 strcpy((char *)&var_header_pub[2], pub_topic);
SomeRandomBloke 0:260fb10c0755 180 var_header_pub[0] = 0;
SomeRandomBloke 0:260fb10c0755 181 var_header_pub[1] = strlen(pub_topic);
SomeRandomBloke 0:260fb10c0755 182 var_header_pub[sizeof(var_header_pub)-1] = 0;
SomeRandomBloke 0:260fb10c0755 183
SomeRandomBloke 0:260fb10c0755 184 uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)};
SomeRandomBloke 0:260fb10c0755 185
SomeRandomBloke 0:260fb10c0755 186 uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)];
SomeRandomBloke 0:260fb10c0755 187 memset(packet_pub,0,sizeof(packet_pub));
SomeRandomBloke 0:260fb10c0755 188 memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub));
SomeRandomBloke 0:260fb10c0755 189 memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub));
SomeRandomBloke 0:260fb10c0755 190 memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg));
SomeRandomBloke 0:260fb10c0755 191
SomeRandomBloke 0:260fb10c0755 192 if (!send_data((char*)packet_pub, sizeof(packet_pub))) {
SomeRandomBloke 0:260fb10c0755 193 return -1;
SomeRandomBloke 0:260fb10c0755 194 }
SomeRandomBloke 0:260fb10c0755 195 return 1;
SomeRandomBloke 0:260fb10c0755 196 }
SomeRandomBloke 0:260fb10c0755 197
SomeRandomBloke 0:260fb10c0755 198
SomeRandomBloke 0:260fb10c0755 199 void MQTTClient::disconnect() {
SomeRandomBloke 0:260fb10c0755 200 char packet_224[] = {0xe0, 0x00};
SomeRandomBloke 0:260fb10c0755 201 send_data((char*)packet_224, 2);
SomeRandomBloke 0:260fb10c0755 202
SomeRandomBloke 0:260fb10c0755 203 connected = false;
SomeRandomBloke 0:260fb10c0755 204 }
SomeRandomBloke 0:260fb10c0755 205
SomeRandomBloke 0:260fb10c0755 206 void MQTTClient::read_data() {
SomeRandomBloke 0:260fb10c0755 207 char buffer[1024];
SomeRandomBloke 0:260fb10c0755 208 int len = 0, readLen;
SomeRandomBloke 0:260fb10c0755 209
SomeRandomBloke 0:260fb10c0755 210 while ((readLen = pTCPSocket->recv(buffer, 1024)) != 0) {
SomeRandomBloke 0:260fb10c0755 211 len += readLen;
SomeRandomBloke 0:260fb10c0755 212 }
SomeRandomBloke 0:260fb10c0755 213
SomeRandomBloke 0:260fb10c0755 214 buffer[len] = '\0';
SomeRandomBloke 0:260fb10c0755 215
SomeRandomBloke 0:260fb10c0755 216
SomeRandomBloke 0:260fb10c0755 217 printf("Read length: %d %d\r\n", len, readLen);
SomeRandomBloke 0:260fb10c0755 218
SomeRandomBloke 0:260fb10c0755 219 for (int i = 0; i < len; i++) {
SomeRandomBloke 0:260fb10c0755 220 printf("%2X ", buffer[i]);
SomeRandomBloke 0:260fb10c0755 221 }
SomeRandomBloke 0:260fb10c0755 222 printf("\r\n");
SomeRandomBloke 0:260fb10c0755 223
SomeRandomBloke 0:260fb10c0755 224 char type = buffer[0]>>4;
SomeRandomBloke 0:260fb10c0755 225 if ( type == 2 ) { // CONNACK
SomeRandomBloke 0:260fb10c0755 226 printf("CONNACK\n");
SomeRandomBloke 0:260fb10c0755 227
SomeRandomBloke 0:260fb10c0755 228 } else if (type == 3) { // PUBLISH
SomeRandomBloke 0:260fb10c0755 229 if (callback_server) {
SomeRandomBloke 0:260fb10c0755 230 short index = 1;
SomeRandomBloke 0:260fb10c0755 231 short multiplier = 1;
SomeRandomBloke 0:260fb10c0755 232 short value = 0;
SomeRandomBloke 0:260fb10c0755 233 uint8_t digit;
SomeRandomBloke 0:260fb10c0755 234 do {
SomeRandomBloke 0:260fb10c0755 235 digit = buffer[index++];
SomeRandomBloke 0:260fb10c0755 236 value += (digit & 127) * multiplier;
SomeRandomBloke 0:260fb10c0755 237 multiplier *= 128;
SomeRandomBloke 0:260fb10c0755 238 } while ((digit & 128) != 0);
SomeRandomBloke 0:260fb10c0755 239 printf( "variable length %d, index %d\n", value, index );
SomeRandomBloke 0:260fb10c0755 240
SomeRandomBloke 0:260fb10c0755 241 // uint8_t tl = (buffer[2]<<3)+buffer[3];
SomeRandomBloke 0:260fb10c0755 242 uint8_t tl = (buffer[index]<<3)+buffer[index+1];
SomeRandomBloke 0:260fb10c0755 243 printf("Topic len %d\n",tl);
SomeRandomBloke 0:260fb10c0755 244 char topic[tl+1];
SomeRandomBloke 0:260fb10c0755 245 for (int i=0; i<tl; i++) {
SomeRandomBloke 0:260fb10c0755 246 topic[i] = buffer[index+2+i];
SomeRandomBloke 0:260fb10c0755 247 }
SomeRandomBloke 0:260fb10c0755 248 topic[tl] = 0;
SomeRandomBloke 0:260fb10c0755 249 // ignore msgID - only support QoS 0 subs
SomeRandomBloke 0:260fb10c0755 250 char *payload = buffer+index+2+tl;
SomeRandomBloke 0:260fb10c0755 251 callback_server(topic,(char*)payload);
SomeRandomBloke 0:260fb10c0755 252 }
SomeRandomBloke 0:260fb10c0755 253 } else if (type == 12) { // PINGREG -- Ask for alive
SomeRandomBloke 0:260fb10c0755 254 char packet_208[] = {0xd0, 0x00};
SomeRandomBloke 0:260fb10c0755 255 send_data((char*)packet_208, 2);
SomeRandomBloke 0:260fb10c0755 256 lastActivity = timer.read_ms();
SomeRandomBloke 0:260fb10c0755 257 }
SomeRandomBloke 0:260fb10c0755 258 }
SomeRandomBloke 0:260fb10c0755 259
SomeRandomBloke 0:260fb10c0755 260 void MQTTClient::read_open_session() {
SomeRandomBloke 0:260fb10c0755 261 char buffer[32];
SomeRandomBloke 0:260fb10c0755 262 int len = 0, readLen;
SomeRandomBloke 0:260fb10c0755 263
SomeRandomBloke 0:260fb10c0755 264 while ((readLen = pTCPSocket->recv(buffer, 32)) != 0) {
SomeRandomBloke 0:260fb10c0755 265 len += readLen;
SomeRandomBloke 0:260fb10c0755 266 }
SomeRandomBloke 0:260fb10c0755 267
SomeRandomBloke 0:260fb10c0755 268 if (len == 4 && buffer[3] == 0) {
SomeRandomBloke 0:260fb10c0755 269 printf("Session opened\r\n");
SomeRandomBloke 0:260fb10c0755 270 sessionOpened = true;
SomeRandomBloke 0:260fb10c0755 271 }
SomeRandomBloke 0:260fb10c0755 272 }
SomeRandomBloke 0:260fb10c0755 273
SomeRandomBloke 0:260fb10c0755 274 int MQTTClient::subscribe(char* topic) {
SomeRandomBloke 0:260fb10c0755 275
SomeRandomBloke 0:260fb10c0755 276 if (connected) {
SomeRandomBloke 0:260fb10c0755 277 uint8_t var_header_topic[] = {0,10};
SomeRandomBloke 0:260fb10c0755 278 uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3};
SomeRandomBloke 0:260fb10c0755 279
SomeRandomBloke 0:260fb10c0755 280 //utf topic
SomeRandomBloke 0:260fb10c0755 281 uint8_t utf_topic[strlen(topic)+3];
SomeRandomBloke 0:260fb10c0755 282 strcpy((char *)&utf_topic[2], topic);
SomeRandomBloke 0:260fb10c0755 283
SomeRandomBloke 0:260fb10c0755 284 utf_topic[0] = 0;
SomeRandomBloke 0:260fb10c0755 285 utf_topic[1] = strlen(topic);
SomeRandomBloke 0:260fb10c0755 286 utf_topic[sizeof(utf_topic)-1] = 0;
SomeRandomBloke 0:260fb10c0755 287
SomeRandomBloke 0:260fb10c0755 288 char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3];
SomeRandomBloke 0:260fb10c0755 289 memset(packet_topic,0,sizeof(packet_topic));
SomeRandomBloke 0:260fb10c0755 290 memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic));
SomeRandomBloke 0:260fb10c0755 291 memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic));
SomeRandomBloke 0:260fb10c0755 292 memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic));
SomeRandomBloke 0:260fb10c0755 293
SomeRandomBloke 0:260fb10c0755 294 if (!send_data(packet_topic, sizeof(packet_topic))) {
SomeRandomBloke 0:260fb10c0755 295 return -1;
SomeRandomBloke 0:260fb10c0755 296 }
SomeRandomBloke 0:260fb10c0755 297 return 1;
SomeRandomBloke 0:260fb10c0755 298 }
SomeRandomBloke 0:260fb10c0755 299 return -1;
SomeRandomBloke 0:260fb10c0755 300 }
SomeRandomBloke 0:260fb10c0755 301
SomeRandomBloke 0:260fb10c0755 302 void MQTTClient::live() {
SomeRandomBloke 0:260fb10c0755 303 if (connected) {
SomeRandomBloke 0:260fb10c0755 304 int t = timer.read_ms();
SomeRandomBloke 0:260fb10c0755 305 if (t - lastActivity > KEEPALIVE) {
SomeRandomBloke 0:260fb10c0755 306 //Send 192 0 to broker
SomeRandomBloke 0:260fb10c0755 307 printf("Send 192\r\n");
SomeRandomBloke 0:260fb10c0755 308 char packet_192[] = {0xc0, 0x00};
SomeRandomBloke 0:260fb10c0755 309 send_data((char*)packet_192, 2);
SomeRandomBloke 0:260fb10c0755 310 lastActivity = t;
SomeRandomBloke 0:260fb10c0755 311 }
SomeRandomBloke 0:260fb10c0755 312 }
SomeRandomBloke 0:260fb10c0755 313 }
SomeRandomBloke 0:260fb10c0755 314
SomeRandomBloke 0:260fb10c0755 315 void MQTTClient::onTCPSocketEvent(TCPSocketEvent e) {
SomeRandomBloke 0:260fb10c0755 316 switch (e) {
SomeRandomBloke 0:260fb10c0755 317 case TCPSOCKET_ACCEPT:
SomeRandomBloke 0:260fb10c0755 318 printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n");
SomeRandomBloke 0:260fb10c0755 319 break;
SomeRandomBloke 0:260fb10c0755 320 case TCPSOCKET_CONNECTED:
SomeRandomBloke 0:260fb10c0755 321 printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n");
SomeRandomBloke 0:260fb10c0755 322 connected = true;
SomeRandomBloke 0:260fb10c0755 323 break;
SomeRandomBloke 0:260fb10c0755 324 case TCPSOCKET_WRITEABLE:
SomeRandomBloke 0:260fb10c0755 325 printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n");
SomeRandomBloke 0:260fb10c0755 326 break;
SomeRandomBloke 0:260fb10c0755 327 case TCPSOCKET_READABLE:
SomeRandomBloke 0:260fb10c0755 328 printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n");
SomeRandomBloke 0:260fb10c0755 329 if (!sessionOpened) {
SomeRandomBloke 0:260fb10c0755 330 read_open_session();
SomeRandomBloke 0:260fb10c0755 331 }
SomeRandomBloke 0:260fb10c0755 332 read_data();
SomeRandomBloke 0:260fb10c0755 333 break;
SomeRandomBloke 0:260fb10c0755 334 case TCPSOCKET_CONTIMEOUT:
SomeRandomBloke 0:260fb10c0755 335 printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n");
SomeRandomBloke 0:260fb10c0755 336 break;
SomeRandomBloke 0:260fb10c0755 337 case TCPSOCKET_CONRST:
SomeRandomBloke 0:260fb10c0755 338 printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n");
SomeRandomBloke 0:260fb10c0755 339 break;
SomeRandomBloke 0:260fb10c0755 340 case TCPSOCKET_CONABRT:
SomeRandomBloke 0:260fb10c0755 341 printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n");
SomeRandomBloke 0:260fb10c0755 342 break;
SomeRandomBloke 0:260fb10c0755 343 case TCPSOCKET_ERROR:
SomeRandomBloke 0:260fb10c0755 344 printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n");
SomeRandomBloke 0:260fb10c0755 345 break;
SomeRandomBloke 0:260fb10c0755 346 case TCPSOCKET_DISCONNECTED:
SomeRandomBloke 0:260fb10c0755 347 printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n");
SomeRandomBloke 0:260fb10c0755 348 pTCPSocket->close();
SomeRandomBloke 0:260fb10c0755 349 connected = false;
SomeRandomBloke 0:260fb10c0755 350 break;
SomeRandomBloke 0:260fb10c0755 351 }
SomeRandomBloke 0:260fb10c0755 352 }