Yalgaar mBed SDK for real-time messaging

Dependents:   YalgaarSDK

Fork of MQTT by Joerg Wende

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers PubSubClient.cpp Source File

PubSubClient.cpp

00001 /*
00002 PubSubClient.cpp - A simple client for MQTT.
00003 Nicholas O'Leary
00004 http://knolleary.net
00005 
00006 initial port for mbed
00007 Joerg Wende
00008 https://twitter.com/joerg_wende
00009 */
00010 
00011 #include "PubSubClient.h"
00012 
00013 
00014 
00015 
00016 Serial sls(USBTX, USBRX, 115200);
00017 
00018 int pubsub_error_code;
00019 
00020 
00021 int PubSubClient::millis()
00022 {
00023     return t.read_ms();
00024 }
00025 
00026 PubSubClient::PubSubClient()
00027 {
00028     this->_state = MQTT_DISCONNECTED;
00029     this->ip = "api.yalgaar.io";
00030     this->port = 1883;
00031     this->t.start();
00032 }
00033 
00034 PubSubClient::PubSubClient(char *ip, int port)
00035 {
00036     this->_state = MQTT_DISCONNECTED;
00037     this->ip = ip;
00038     this->port = port;
00039     this->t.start();
00040 }
00041 
00042 
00043 bool PubSubClient::connect(char *id)
00044 {
00045     return connect(id,NULL,NULL,0,0,0,0);
00046 }
00047 
00048 bool PubSubClient::connect(char *id, char *user, char *pass)
00049 {
00050     return connect(id,user,pass,0,0,0,0);
00051 }
00052 
00053 bool PubSubClient::connect(char *id, char* willTopic, short willQos, short willRetain, char* willMessage)
00054 {
00055     return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
00056 }
00057 
00058 bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, short willQos, short willRetain, char* willMessage)
00059 {
00060     if (!connected()) {
00061         int result = 0;
00062         result = _client.connect(this->ip, this->port);
00063         _client.set_blocking(false, 1);
00064         //pc1.printf("IP: %s\r\n",this->ip);
00065         //pc1.printf("Port: %i\r\n",this->port);
00066         //pc1.printf("Result: %i \r\n", result);
00067 
00068         if (result==0) {
00069             nextMsgId = 1;
00070             char d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
00071             // Leave room in the buffer for header and variable length field
00072             int length = 5;
00073             unsigned int j;
00074             for (j = 0; j<9; j++) {
00075                 buffer[length++] = d[j];
00076             }
00077 
00078             char v;
00079             if (willTopic) {
00080                 v = 0x06|(willQos<<3)|(willRetain<<5);
00081             } else {
00082                 v = 0x02;
00083             }
00084 
00085             if(user != NULL) {
00086                 v = v|0x80;
00087 
00088                 if(pass != NULL) {
00089                     v = v|(0x80>>1);
00090                 }
00091             }
00092 
00093             buffer[length++] = v;
00094 
00095             buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
00096             buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
00097             length = writeString(id,buffer,length);
00098             if (willTopic) {
00099                 length = writeString(willTopic,buffer,length);
00100                 length = writeString(willMessage,buffer,length);
00101             }
00102 
00103             if(user != NULL) {
00104                 length = writeString(user,buffer,length);
00105                 if(pass != NULL) {
00106                     length = writeString(pass,buffer,length);
00107                 }
00108             }
00109             //pc1.printf("Before MQTT Connect ... \r\n");
00110             write(MQTTCONNECT,buffer,length-5);
00111 
00112             lastInActivity = lastOutActivity = millis();
00113 
00114             int llen=128;
00115             int len =0;
00116 
00117             while ((len=readPacket(llen))==0) {
00118                 unsigned long t = millis();
00119                 if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) {
00120                     _state = MQTT_CONNECTION_TIMEOUT;
00121                     _client.close(true);
00122                     return false;
00123                 }
00124             }
00125             //pc1.printf("after MQTT Connect ... %i\r\n",len);
00126             if (len == 4) {
00127                 if (buffer[3] == 0) {
00128                     lastInActivity = millis();
00129                     pingOutstanding = false;
00130                     _state = MQTT_CONNECTED;
00131                     return true;
00132                 } else {
00133                     _state = buffer[3];
00134                 }
00135             }
00136         }
00137         _client.close(true);
00138     }
00139     return false;
00140 }
00141 
00142 
00143 int PubSubClient::readPacket(int lengthLength)
00144 {
00145     int len = 0;
00146     len = _client.receive_all(buffer,lengthLength);
00147     return len;
00148 }
00149 
00150 bool PubSubClient::loop()
00151 {
00152     if (connected()) {
00153         unsigned long t = millis();
00154         if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
00155             if (pingOutstanding) {
00156                 this->_state = MQTT_CONNECTION_TIMEOUT;
00157                 _client.close(true);
00158                 return false;
00159             } else {
00160                 buffer[0] = MQTTPINGREQ;
00161                 buffer[1] = 0;
00162                 _client.send(buffer,2);
00163                 lastOutActivity = t;
00164                 lastInActivity = t;
00165                 pingOutstanding = true;
00166             }
00167         }
00168         int len;
00169         int llen= 128;
00170         if (!((len=readPacket(llen))==0)) {
00171             if (len > 0) {
00172                 lastInActivity = t;
00173                 char type = buffer[0]&0xF0;
00174                 //pc1.printf("type :: %d\r\n",type);
00175                 //pc1.printf("buffer[%d] :: %d\r\n",len-1,buffer[len-1]);
00176                 if (type == MQTTPUBLISH) {
00177                     if (callback) {
00178                         //pc1.printf("MQTTPUBLISH received ... %i\r\n",len);
00179                         int tl = (buffer[2]<<8)+buffer[3];
00180                         //pc1.printf("t1 ... %i\r\n",tl);
00181                         char topic[tl+1];
00182                         for (int i=0; i<tl; i++) {
00183                             topic[i] = buffer[4+i];
00184                         }
00185                         topic[tl] = 0;
00186                         //pc1.printf("MQTTPUBLISH Topic ... %s\r\n",topic);
00187                         // ignore msgID - only support QoS 0 subs
00188                         int t2 = len-4-tl;
00189                         //pc1.printf("t2 ... %i\r\n",t2);
00190                         char payload[t2+1];
00191                         for (int i=0; i<t2; i++) {
00192                             payload[i] = buffer[4+i+tl];
00193                         }
00194                         payload[t2] = 0;
00195                         //pc1.printf("MQTTPUBLISH Payload ... %s\r\n",payload);
00196                         callback(topic,payload,t2);
00197                     }
00198                 } else if (type == MQTTPINGREQ) {
00199                     buffer[0] = MQTTPINGRESP;
00200                     buffer[1] = 0;
00201                     _client.send(buffer,2);
00202                 } else if (type == MQTTPINGRESP) {
00203                     pingOutstanding = false;
00204                 } else {
00205                     //for(int i=0;i<=len;i++)
00206                     //pc1.printf("buffer[%d] :: %d\r\n",i,buffer[i]);
00207                     if (buffer[0] == 144 && buffer[len-1] == 0) {
00208                         sls.printf("Subscribed\r\n");
00209                     }
00210                     if (true) {
00211                         //pc1.printf("In loop function buffer :: %d\r\n",buffer[len-1]);
00212                         pubsub_error_code = buffer[len-1];
00213                     }
00214                 }
00215             }
00216         }
00217         return true;
00218     }
00219     return false;
00220 }
00221 
00222 bool PubSubClient::publish(char* topic, char* payload)
00223 {
00224     return publish(topic,payload,strlen(payload),false);
00225 }
00226 
00227 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength)
00228 {
00229     return publish(topic, payload, plength, false);
00230 }
00231 
00232 bool PubSubClient::publish(char* topic, char* payload, unsigned int plength, bool retained)
00233 {
00234     if (connected()) {
00235         // Leave room in the buffer for header and variable length field
00236         //pc1.printf("in publish ... $$$$$$$$$$$:::%s\r\n",topic);
00237         //pc1.printf("in publish ... %s\r\n",payload);
00238         int length = 5;
00239         length = writeString(topic,buffer,length);
00240         int i;
00241         for (i=0; i<plength; i++) {
00242             buffer[length++] = payload[i];
00243         }
00244         short header = MQTTPUBLISH;
00245         if (retained) {
00246             header |= 1;
00247         }
00248         return write(header,buffer,length-5);
00249     }
00250     return false;
00251 }
00252 
00253 
00254 
00255 bool PubSubClient::write(short header, char* buf, int length)
00256 {
00257     short lenBuf[4];
00258     short llen = 0;
00259     short digit;
00260     short pos = 0;
00261     short rc;
00262     short len = length;
00263     //pc1.printf("in write ... %d\r\n",length);
00264     //pc1.printf("in write ...$$$$$$:: %s\r\n",buf);
00265     do {
00266         digit = len % 128;
00267         len = len / 128;
00268         if (len > 0) {
00269             digit |= 0x80;
00270         }
00271         lenBuf[pos++] = digit;
00272         llen++;
00273     } while(len>0);
00274 
00275     buf[4-llen] = header;
00276     for (int i=0; i<llen; i++) {
00277         buf[5-llen+i] = lenBuf[i];
00278     }
00279     rc = _client.send(buf+(4-llen),length+1+llen);
00280 
00281     lastOutActivity = millis();
00282     return (rc == 1+llen+length);
00283 }
00284 
00285 bool PubSubClient::subscribe(char* topic)
00286 {
00287     //pc1.printf("in subscribe ... %s\r\n",topic);
00288     if (connected()) {
00289         // Leave room in the buffer for header and variable length field
00290         int length = 5;
00291         nextMsgId++;
00292         if (nextMsgId == 0) {
00293             nextMsgId = 1;
00294         }
00295         buffer[length++] = (nextMsgId >> 8);
00296         buffer[length++] = (nextMsgId & 0xFF);
00297         length = writeString(topic, buffer,length);
00298         buffer[length++] = 0; // Only do QoS 0 subs
00299         return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
00300     }
00301     return false;
00302 }
00303 
00304 bool PubSubClient::unsubscribe(char* topic)
00305 {
00306     if (connected()) {
00307         int length = 5;
00308         nextMsgId++;
00309         if (nextMsgId == 0) {
00310             nextMsgId = 1;
00311         }
00312         buffer[length++] = (nextMsgId >> 8);
00313         buffer[length++] = (nextMsgId & 0xFF);
00314         length = writeString(topic, buffer,length);
00315         return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
00316     }
00317     return false;
00318 }
00319 
00320 void PubSubClient::disconnect()
00321 {
00322     buffer[0] = MQTTDISCONNECT;
00323     buffer[1] = 0;
00324     _client.send(buffer,2);
00325     _state = MQTT_DISCONNECTED;
00326     _client.close(true);
00327     lastInActivity = lastOutActivity = millis();
00328 }
00329 
00330 int PubSubClient::writeString(char* string, char* buf, int pos)
00331 {
00332     char* idp = string;
00333     int i = 0;
00334     pos += 2;
00335     while (*idp) {
00336         buf[pos++] = *idp++;
00337         i++;
00338     }
00339     buf[pos-i-2] = (i >> 8);
00340     buf[pos-i-1] = (i & 0xFF);
00341     return pos;
00342 }
00343 
00344 
00345 bool PubSubClient::connected()
00346 {
00347     bool rc;
00348     rc = (int)_client.is_connected();
00349     if (!rc) _client.close(true);
00350     return rc;
00351 }
00352 
00353 PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE)
00354 {
00355     this->callback = callback;
00356     return *this;
00357 }
00358 
00359 int PubSubClient::mqtt_state()
00360 {
00361     return this->_state;
00362 }