Sample MQTT program - simple send and receive

Dependencies:   C12832 MQTT

Dependents:   MQTT_G_SENSOR

This program and the MQTT libraries it uses are part of the EclipseTM Paho project; specifically the embedded client.

This example and API are working, but are still in progress. Please give us your feedback.

HelloMQTT is an example of using the MQTT API. The MQTT API is portable across network interface stacks. MQTT is designed to be used with TCP/IP, but any transport with similar characteristics should be suitable.

HelloMQTT uses the NetworkInterface APIs in mbed OS 5 to show how this works. The MQTT library contains an MQTTNetwork.h header, which is a wrapper around the mbed networking interface. To switch between connectivity methods (the default is Ethernet) the easy-connect library is provided in this example application. You can change the connectivity method in mbed_app.json.

Adding new connectivity methods to the program is trivial, as long as they implement the mbed OS 5 NetworkStack API.

Committer:
icraggs
Date:
Sun May 11 18:52:57 2014 +0000
Revision:
6:e4c690c45021
Parent:
3:7a6a899de7cc
Child:
7:3de634f2d40c
Wildcard subscriptions

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 1:a1d5c7a6acbc 1 /*******************************************************************************
icraggs 1:a1d5c7a6acbc 2 * Copyright (c) 2014 IBM Corp.
icraggs 1:a1d5c7a6acbc 3 *
icraggs 1:a1d5c7a6acbc 4 * All rights reserved. This program and the accompanying materials
icraggs 1:a1d5c7a6acbc 5 * are made available under the terms of the Eclipse Public License v1.0
icraggs 1:a1d5c7a6acbc 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
icraggs 1:a1d5c7a6acbc 7 *
icraggs 1:a1d5c7a6acbc 8 * The Eclipse Public License is available at
icraggs 1:a1d5c7a6acbc 9 * http://www.eclipse.org/legal/epl-v10.html
icraggs 1:a1d5c7a6acbc 10 * and the Eclipse Distribution License is available at
icraggs 1:a1d5c7a6acbc 11 * http://www.eclipse.org/org/documents/edl-v10.php.
icraggs 1:a1d5c7a6acbc 12 *
icraggs 1:a1d5c7a6acbc 13 * Contributors:
icraggs 1:a1d5c7a6acbc 14 * Ian Craggs - initial API and implementation and/or initial documentation
icraggs 1:a1d5c7a6acbc 15 *******************************************************************************/
icraggs 2:638c854c0695 16
icraggs 2:638c854c0695 17 /**
icraggs 2:638c854c0695 18 This is a sample program to illustrate the use of the MQTT Client library
icraggs 2:638c854c0695 19 on the mbed platform. The Client class requires two classes which mediate
icraggs 2:638c854c0695 20 access to system interfaces for networking and timing. As long as these two
icraggs 2:638c854c0695 21 classes provide the required public programming interfaces, it does not matter
icraggs 2:638c854c0695 22 what facilities they use underneath. In this program, they use the mbed
icraggs 2:638c854c0695 23 system libraries.
icraggs 2:638c854c0695 24
icraggs 2:638c854c0695 25 */
icraggs 1:a1d5c7a6acbc 26
icraggs 0:0cae29831d01 27 #include "mbed.h"
icraggs 0:0cae29831d01 28 #include "EthernetInterface.h"
icraggs 2:638c854c0695 29
icraggs 0:0cae29831d01 30 #include "C12832_lcd.h"
icraggs 0:0cae29831d01 31 C12832_LCD lcd;
icraggs 0:0cae29831d01 32
icraggs 2:638c854c0695 33 #include "FP.cpp"
icraggs 2:638c854c0695 34 #include "MQTTClient.h"
icraggs 2:638c854c0695 35
icraggs 2:638c854c0695 36
icraggs 2:638c854c0695 37
icraggs 2:638c854c0695 38 class IPStack
icraggs 0:0cae29831d01 39 {
icraggs 2:638c854c0695 40 public:
icraggs 2:638c854c0695 41 IPStack()
icraggs 2:638c854c0695 42 {
icraggs 2:638c854c0695 43 eth.init(); // Use DHCP
icraggs 2:638c854c0695 44 eth.connect();
icraggs 2:638c854c0695 45 mysock.set_blocking(false, 1000); // 1 second Timeout
icraggs 2:638c854c0695 46 }
icraggs 2:638c854c0695 47
icraggs 2:638c854c0695 48 int connect(char* hostname, int port)
icraggs 2:638c854c0695 49 {
icraggs 2:638c854c0695 50 return mysock.connect(hostname, port);
icraggs 2:638c854c0695 51 }
icraggs 2:638c854c0695 52
icraggs 2:638c854c0695 53 int read(char* buffer, int len, int timeout)
icraggs 2:638c854c0695 54 {
icraggs 2:638c854c0695 55 mysock.set_blocking(false, timeout);
icraggs 2:638c854c0695 56 return mysock.receive(buffer, len);
icraggs 2:638c854c0695 57 }
icraggs 2:638c854c0695 58
icraggs 2:638c854c0695 59 int write(char* buffer, int len, int timeout)
icraggs 2:638c854c0695 60 {
icraggs 2:638c854c0695 61 mysock.set_blocking(false, timeout);
icraggs 2:638c854c0695 62 return mysock.send(buffer, len);
icraggs 2:638c854c0695 63 }
icraggs 2:638c854c0695 64
icraggs 2:638c854c0695 65 int disconnect()
icraggs 2:638c854c0695 66 {
icraggs 2:638c854c0695 67 return mysock.close();
icraggs 2:638c854c0695 68 }
icraggs 2:638c854c0695 69
icraggs 2:638c854c0695 70 private:
icraggs 2:638c854c0695 71
icraggs 2:638c854c0695 72 EthernetInterface eth;
icraggs 0:0cae29831d01 73 TCPSocketConnection mysock;
icraggs 2:638c854c0695 74
icraggs 2:638c854c0695 75 };
icraggs 2:638c854c0695 76
icraggs 2:638c854c0695 77
icraggs 2:638c854c0695 78 class Countdown
icraggs 2:638c854c0695 79 {
icraggs 2:638c854c0695 80 public:
icraggs 2:638c854c0695 81 Countdown()
icraggs 2:638c854c0695 82 {
icraggs 2:638c854c0695 83 t = Timer();
icraggs 2:638c854c0695 84 }
icraggs 2:638c854c0695 85
icraggs 2:638c854c0695 86 Countdown(int ms)
icraggs 2:638c854c0695 87 {
icraggs 2:638c854c0695 88 t = Timer();
icraggs 2:638c854c0695 89 countdown_ms(ms);
icraggs 2:638c854c0695 90 }
icraggs 2:638c854c0695 91
icraggs 0:0cae29831d01 92
icraggs 2:638c854c0695 93 bool expired()
icraggs 2:638c854c0695 94 {
icraggs 2:638c854c0695 95 return t.read_ms() >= interval_end_ms;
icraggs 2:638c854c0695 96 }
icraggs 2:638c854c0695 97
icraggs 2:638c854c0695 98 void countdown_ms(int ms)
icraggs 2:638c854c0695 99 {
icraggs 2:638c854c0695 100 t.stop();
icraggs 2:638c854c0695 101 interval_end_ms = ms;
icraggs 2:638c854c0695 102 t.reset();
icraggs 2:638c854c0695 103 t.start();
icraggs 2:638c854c0695 104 }
icraggs 2:638c854c0695 105
icraggs 2:638c854c0695 106 void countdown(int seconds)
icraggs 2:638c854c0695 107 {
icraggs 2:638c854c0695 108 countdown_ms(seconds * 1000);
icraggs 2:638c854c0695 109 }
icraggs 2:638c854c0695 110
icraggs 2:638c854c0695 111 int left_ms()
icraggs 2:638c854c0695 112 {
icraggs 2:638c854c0695 113 return interval_end_ms - t.read_ms();
icraggs 2:638c854c0695 114 }
icraggs 2:638c854c0695 115
icraggs 2:638c854c0695 116 private:
icraggs 2:638c854c0695 117 Timer t;
icraggs 2:638c854c0695 118 int interval_end_ms;
icraggs 2:638c854c0695 119 };
icraggs 0:0cae29831d01 120
icraggs 2:638c854c0695 121 int arrivedcount = 0;
icraggs 2:638c854c0695 122
icraggs 2:638c854c0695 123 void messageArrived(MQTT::Message* message)
icraggs 2:638c854c0695 124 {
icraggs 2:638c854c0695 125 lcd.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message->qos, message->retained, message->dup, message->id);
icraggs 2:638c854c0695 126 lcd.printf("Payload %.*s\n", message->payloadlen, (char*)message->payload);
icraggs 2:638c854c0695 127 ++arrivedcount;
icraggs 2:638c854c0695 128 }
icraggs 0:0cae29831d01 129
icraggs 2:638c854c0695 130
icraggs 2:638c854c0695 131 int main(int argc, char* argv[])
icraggs 2:638c854c0695 132 {
icraggs 2:638c854c0695 133 IPStack ipstack = IPStack();
icraggs 6:e4c690c45021 134 float version = 0.42;
icraggs 2:638c854c0695 135 char* topic = "mbed-sample";
icraggs 2:638c854c0695 136
icraggs 2:638c854c0695 137 lcd.printf("Version is %f\n", version);
icraggs 2:638c854c0695 138
icraggs 3:7a6a899de7cc 139 MQTT::Client<IPStack, Countdown> client = MQTT::Client<IPStack, Countdown>(ipstack);
icraggs 3:7a6a899de7cc 140
icraggs 6:e4c690c45021 141 char* hostname = "m2m.eclipse.org";
icraggs 6:e4c690c45021 142 int port = 1883;
icraggs 6:e4c690c45021 143 lcd.printf("Connecting to %s:%d\n", hostname, port);
icraggs 6:e4c690c45021 144 int rc = ipstack.connect(hostname, port);
icraggs 6:e4c690c45021 145 if (rc != 0)
icraggs 6:e4c690c45021 146 lcd.printf("rc from TCP connect is %d\n", rc);
icraggs 6:e4c690c45021 147
icraggs 6:e4c690c45021 148 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
icraggs 6:e4c690c45021 149 data.MQTTVersion = 3;
icraggs 6:e4c690c45021 150 data.clientID.cstring = "mbed-icraggs";
icraggs 6:e4c690c45021 151 rc = client.connect(&data);
icraggs 6:e4c690c45021 152 if (rc != 0)
icraggs 6:e4c690c45021 153 lcd.printf("rc from MQTT connect is %d\n", rc);
icraggs 2:638c854c0695 154
icraggs 2:638c854c0695 155 rc = client.subscribe(topic, MQTT::QOS1, messageArrived);
icraggs 2:638c854c0695 156 if (rc != 0)
icraggs 2:638c854c0695 157 lcd.printf("rc from MQTT subscribe is %d\n", rc);
icraggs 2:638c854c0695 158
icraggs 2:638c854c0695 159 MQTT::Message message;
icraggs 0:0cae29831d01 160
icraggs 2:638c854c0695 161 // QoS 0
icraggs 2:638c854c0695 162 char buf[100];
icraggs 2:638c854c0695 163 sprintf(buf, "Hello World! QoS 0 message from app version %f\n", version);
icraggs 2:638c854c0695 164 message.qos = MQTT::QOS0;
icraggs 2:638c854c0695 165 message.retained = false;
icraggs 2:638c854c0695 166 message.dup = false;
icraggs 2:638c854c0695 167 message.payload = (void*)buf;
icraggs 2:638c854c0695 168 message.payloadlen = strlen(buf)+1;
icraggs 2:638c854c0695 169 rc = client.publish(topic, &message);
icraggs 2:638c854c0695 170 while (arrivedcount == 0)
icraggs 2:638c854c0695 171 client.yield(100);
icraggs 2:638c854c0695 172
icraggs 2:638c854c0695 173 // QoS 1
icraggs 2:638c854c0695 174 sprintf(buf, "Hello World! QoS 1 message from app version %f\n", version);
icraggs 2:638c854c0695 175 message.qos = MQTT::QOS1;
icraggs 2:638c854c0695 176 message.payloadlen = strlen(buf)+1;
icraggs 2:638c854c0695 177 rc = client.publish(topic, &message);
icraggs 2:638c854c0695 178 while (arrivedcount == 1)
icraggs 2:638c854c0695 179 client.yield(100);
icraggs 2:638c854c0695 180
icraggs 2:638c854c0695 181 // QoS 2
icraggs 2:638c854c0695 182 sprintf(buf, "Hello World! QoS 2 message from app version %f\n", version);
icraggs 2:638c854c0695 183 message.qos = MQTT::QOS2;
icraggs 2:638c854c0695 184 message.payloadlen = strlen(buf)+1;
icraggs 2:638c854c0695 185 rc = client.publish(topic, &message);
icraggs 2:638c854c0695 186 while (arrivedcount == 2)
icraggs 2:638c854c0695 187 client.yield(100);
icraggs 2:638c854c0695 188
icraggs 2:638c854c0695 189 rc = client.unsubscribe(topic);
icraggs 2:638c854c0695 190 if (rc != 0)
icraggs 2:638c854c0695 191 lcd.printf("rc from unsubscribe was %d\n", rc);
icraggs 2:638c854c0695 192
icraggs 2:638c854c0695 193 rc = client.disconnect();
icraggs 2:638c854c0695 194 if (rc != 0)
icraggs 2:638c854c0695 195 lcd.printf("rc from disconnect was %d\n", rc);
icraggs 2:638c854c0695 196
icraggs 2:638c854c0695 197 ipstack.disconnect();
icraggs 2:638c854c0695 198
icraggs 2:638c854c0695 199 lcd.printf("Finishing with %d messages received\n", arrivedcount);
icraggs 2:638c854c0695 200
icraggs 0:0cae29831d01 201 return 0;
icraggs 0:0cae29831d01 202 }