MQTT Client for ENC28J60 Ethernet modules. Depends on the UIPEthernet library.

Dependents:   mBuino_ENC28_MQTT MQTT_DHT11_ENC28J60 MQTT_DHT11_ENC28J60_tushar MQTT_Hello_ENC28J60

MQTT library for ENC28J60 Ethernet modules.

/media/uploads/hudakz/enc28j60_module01.jpg

Depends on the UIPEthernet library.

Example program:

Import programMQTT_Hello_ENC28J60

MQTT Client example program. Ethernet connection is via an ENC28J60 module.

Revision:
0:83c732b10e95
Child:
1:87da395325fc
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTClient.cpp	Mon Sep 15 12:43:54 2014 +0000
@@ -0,0 +1,406 @@
+/*
+ MQTTClient.cpp - A simple client for MQTT.
+  Nicholas O'Leary
+  http://knolleary.net
+*/
+
+#include "MQTTClient.h"
+#include <string.h>
+#include <time.h>
+
+MQTTClient::MQTTClient()
+{
+    this->_client = NULL;
+    this->stream = NULL;
+}
+
+MQTTClient::MQTTClient(IPAddress& ip, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client)
+{
+    this->_client = &client;
+    this->onMessage = onMessage;
+    this->ip = ip;
+    this->port = port;
+    this->domain = NULL;
+    this->stream = NULL;
+}
+
+MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client)
+{
+    this->_client = &client;
+    this->onMessage = onMessage;
+    this->domain = domain;
+    this->port = port;
+    this->stream = NULL;
+}
+
+MQTTClient::MQTTClient(IPAddress& ip, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client, Stream& stream)
+{
+    this->_client = &client;
+    this->onMessage = onMessage;
+    this->ip = ip;
+    this->port = port;
+    this->domain = NULL;
+    this->stream = &stream;
+}
+
+MQTTClient::MQTTClient(char* domain, uint16_t port, void (*onMessage)(char*,uint8_t*,unsigned int), Client& client, Stream& stream)
+{
+    this->_client = &client;
+    this->onMessage = onMessage;
+    this->domain = domain;
+    this->port = port;
+    this->stream = &stream;
+}
+
+bool MQTTClient::connect(char *id)
+{
+    return connect(id,NULL,NULL,0,0,0,0);
+}
+
+bool MQTTClient::connect(char *id, char *user, char *pass)
+{
+    return connect(id,user,pass,0,0,0,0);
+}
+
+bool MQTTClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage)
+{
+    return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
+}
+
+bool MQTTClient::connect(char *id, char *user, char *pass, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage)
+{
+    if (!connected()) {
+        int result = 0;
+
+        if (domain != NULL) {
+            result = _client->connect(this->domain, this->port);
+        } else {
+            result = _client->connect(this->ip, this->port);
+        }
+
+        if (result) {
+            nextMsgId = 1;
+            uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
+            // Leave room in the buffer for header and variable length field
+            uint16_t length = 5;
+            unsigned int j;
+            for (j = 0; j<9; j++) {
+                buffer[length++] = d[j];
+            }
+
+            uint8_t v;
+            if (willTopic) {
+                v = 0x06|(willQos<<3)|(willRetain<<5);
+            } else {
+                v = 0x02;
+            }
+
+            if(user != NULL) {
+                v = v|0x80;
+
+                if(pass != NULL) {
+                    v = v|(0x80>>1);
+                }
+            }
+
+            buffer[length++] = v;
+
+            buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
+            buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
+            length = writeString(id,buffer,length);
+            if (willTopic) {
+                length = writeString(willTopic,buffer,length);
+                length = writeString(willMessage,buffer,length);
+            }
+
+            if(user != NULL) {
+                length = writeString(user,buffer,length);
+                if(pass != NULL) {
+                    length = writeString(pass,buffer,length);
+                }
+            }
+
+            write(MQTTCONNECT,buffer,length-5);
+
+            lastInActivity = lastOutActivity = time(NULL);
+
+            while (!_client->available()) {
+                time_t t = time(NULL);
+                if (t-lastInActivity > MQTT_KEEPALIVE) {
+                    _client->stop();
+                    return false;
+                }
+            }
+            uint8_t llen;
+            uint16_t len = readPacket(&llen);
+
+            if (len == 4 && buffer[3] == 0) {
+                lastInActivity = time(NULL);
+                pingOutstanding = false;
+                return true;
+            }
+        }
+        _client->stop();
+    }
+    return false;
+}
+
+uint8_t MQTTClient::readByte()
+{
+    while(!_client->available()) {}
+    return _client->read();
+}
+
+uint16_t MQTTClient::readPacket(uint8_t* lengthLength)
+{
+    uint16_t len = 0;
+    buffer[len++] = readByte();
+    bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
+    uint32_t multiplier = 1;
+    uint16_t length = 0;
+    uint8_t digit = 0;
+    uint16_t skip = 0;
+    uint8_t start = 0;
+
+    do {
+        digit = readByte();
+        buffer[len++] = digit;
+        length += (digit & 127) * multiplier;
+        multiplier *= 128;
+    } while ((digit & 128) != 0);
+    *lengthLength = len-1;
+
+    if (isPublish) {
+        // Read in topic length to calculate bytes to skip over for Stream writing
+        buffer[len++] = readByte();
+        buffer[len++] = readByte();
+        skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
+        start = 2;
+        if (buffer[0]&MQTTQOS1) {
+            // skip message id
+            skip += 2;
+        }
+    }
+
+    for (uint16_t i = start; i<length; i++) {
+        digit = readByte();
+        if (this->stream) {
+            if (isPublish && len-*lengthLength-2>skip) {
+                this->stream->putc(digit);
+            }
+        }
+        if (len < MQTT_MAX_PACKET_SIZE) {
+            buffer[len] = digit;
+        }
+        len++;
+    }
+
+    if (!this->stream && len > MQTT_MAX_PACKET_SIZE) {
+        len = 0; // This will cause the packet to be ignored.
+    }
+
+    return len;
+}
+
+bool MQTTClient::loop()
+{
+    if (connected()) {
+        time_t t = time(NULL);
+        if ((t - lastInActivity > MQTT_KEEPALIVE) || (t - lastOutActivity > MQTT_KEEPALIVE)) {
+            if (pingOutstanding) {
+                _client->stop();
+                return false;
+            } else {
+                buffer[0] = MQTTPINGREQ;
+                buffer[1] = 0;
+                _client->write(buffer,2);
+                lastOutActivity = t;
+                lastInActivity = t;
+                pingOutstanding = true;
+            }
+        }
+        if (_client->available()) {
+            uint8_t llen;
+            uint16_t len = readPacket(&llen);
+            uint16_t msgId = 0;
+            uint8_t *payload;
+            if (len > 0) {
+                lastInActivity = t;
+                uint8_t type = buffer[0]&0xF0;
+                if (type == MQTTPUBLISH) {
+                    if (onMessage) {
+                        uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2];
+                        char topic[tl+1];
+                        for (uint16_t i=0; i<tl; i++) {
+                            topic[i] = buffer[llen+3+i];
+                        }
+                        topic[tl] = 0;
+                        // msgId only present for QOS>0
+                        if ((buffer[0]&0x06) == MQTTQOS1) {
+                            msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
+                            payload = buffer+llen+3+tl+2;
+                            onMessage(topic,payload,len-llen-3-tl-2);
+
+                            buffer[0] = MQTTPUBACK;
+                            buffer[1] = 2;
+                            buffer[2] = (msgId >> 8);
+                            buffer[3] = (msgId & 0xFF);
+                            _client->write(buffer,4);
+                            lastOutActivity = t;
+
+                        } else {
+                            payload = buffer+llen+3+tl;
+                            onMessage(topic,payload,len-llen-3-tl);
+                        }
+                    }
+                } else if (type == MQTTPINGREQ) {
+                    buffer[0] = MQTTPINGRESP;
+                    buffer[1] = 0;
+                    _client->write(buffer,2);
+                } else if (type == MQTTPINGRESP) {
+                    pingOutstanding = false;
+                }
+            }
+        }
+        return true;
+    }
+    return false;
+}
+
+bool MQTTClient::publish(char* topic, char* payload)
+{
+    return publish(topic,(uint8_t*)payload,strlen(payload),false);
+}
+
+bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength)
+{
+    return publish(topic, payload, plength, false);
+}
+
+bool MQTTClient::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained)
+{
+    if (connected()) {
+        // Leave room in the buffer for header and variable length field
+        uint16_t length = 5;
+        length = writeString(topic,buffer,length);
+        uint16_t i;
+        for (i=0; i<plength; i++) {
+            buffer[length++] = payload[i];
+        }
+        uint8_t header = MQTTPUBLISH;
+        if (retained) {
+            header |= 1;
+        }
+        return write(header,buffer,length-5);
+    }
+    return false;
+}
+
+bool MQTTClient::write(uint8_t header, uint8_t* buf, uint16_t length)
+{
+    uint8_t lenBuf[4];
+    uint8_t llen = 0;
+    uint8_t digit;
+    uint8_t pos = 0;
+    uint8_t rc;
+    uint8_t len = length;
+    do {
+        digit = len % 128;
+        len = len / 128;
+        if (len > 0) {
+            digit |= 0x80;
+        }
+        lenBuf[pos++] = digit;
+        llen++;
+    } while(len>0);
+
+    buf[4-llen] = header;
+    for (int i=0; i<llen; i++) {
+        buf[5-llen+i] = lenBuf[i];
+    }
+    rc = _client->write(buf+(4-llen),length+1+llen);
+
+    lastOutActivity = time(NULL);
+    return (rc == 1+llen+length);
+}
+
+bool MQTTClient::subscribe(char* topic)
+{
+    return subscribe(topic, 0);
+}
+
+bool MQTTClient::subscribe(char* topic, uint8_t qos)
+{
+    if (qos > 1)
+        return false;
+
+    if (connected()) {
+        // Leave room in the buffer for header and variable length field
+        uint16_t length = 5;
+        nextMsgId++;
+        if (nextMsgId == 0) {
+            nextMsgId = 1;
+        }
+        buffer[length++] = (nextMsgId >> 8);
+        buffer[length++] = (nextMsgId & 0xFF);
+        length = writeString(topic, buffer,length);
+        buffer[length++] = qos;
+        return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
+    }
+    return false;
+}
+
+bool MQTTClient::unsubscribe(char* topic)
+{
+    if (connected()) {
+        uint16_t length = 5;
+        nextMsgId++;
+        if (nextMsgId == 0) {
+            nextMsgId = 1;
+        }
+        buffer[length++] = (nextMsgId >> 8);
+        buffer[length++] = (nextMsgId & 0xFF);
+        length = writeString(topic, buffer,length);
+        return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
+    }
+    return false;
+}
+
+void MQTTClient::disconnect()
+{
+    buffer[0] = MQTTDISCONNECT;
+    buffer[1] = 0;
+    _client->write(buffer,2);
+    _client->stop();
+    lastInActivity = lastOutActivity = time(NULL);
+}
+
+uint16_t MQTTClient::writeString(char* string, uint8_t* buf, uint16_t pos)
+{
+    char* idp = string;
+    uint16_t i = 0;
+    pos += 2;
+    while (*idp) {
+        buf[pos++] = *idp++;
+        i++;
+    }
+    buf[pos-i-2] = (i >> 8);
+    buf[pos-i-1] = (i & 0xFF);
+    return pos;
+}
+
+
+bool MQTTClient::connected()
+{
+    bool rc;
+    if (_client == NULL ) {
+        rc = false;
+    } else {
+        rc = (int)_client->connected();
+        if (!rc) _client->stop();
+    }
+    return rc;
+}
+
+
+