Working on MQTT, a machine-to-machine (M2M) "Internet of Things" connectivity protocol.

What should an mbed MQTT API look like?

18 Mar 2014

Up to now, we have had two major styles of MQTT APIs, which are exemplified by the Paho C and Java client libraries. The first style was designed to make MQTT programming a little easier. To that end, all of the calls block to some extent:

  • the connect method waits for a connack before returning
  • the un/subscribe methods wait for the un/suback before returning
  • the publish method waits for the publish packet to be written to the network. It also only allows a limited number of MQTT messages to be inflight at any one time - it will block until some QoS 1 or 2 exchanges have completed when that limit would be exceeded. The limit is typically 10.

This, synchronous, style, as we call it, is described for the C client here: http://www.eclipse.org/paho/files/mqttdoc/Cclient/index.html, and for the Java client: http://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttClient.html.

In this style, there are 3 callbacks:

  • messageArrived - when a publish is received from the server
  • connectionLost - when the connection to the server is lost - the application can reconnect here
  • messageDelivered - when a QoS 1 or 2 message has been sent to a server

The second style, asynchronous, was intended to be used in environments where the application does not have the main thread of control, and blocking calls are not acceptable. Thus every API call indicates its success or failure by means of a callback, and never blocks.

This style of API is somewhat harder to use, providing more ability to shoot yourself in the foot, but is more flexible than the synchronous API - more operations can be executed in parallel. This style of API is described for the C client here: http://www.eclipse.org/paho/files/mqttdoc/Casync/index.html and for the Java client: http://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttAsyncClient.html. The Javascript API is in this style because it is intended to be used in a web browser using websockets.

The style of programs using the asynchronous API becomes quite different, and is one I have come to like. The whole program can become a series of callbacks. Connect, then wait for the connect callback, and in that callback, subscribe, and so on.

An mbed API could look like one of these APIs but with extra callbacks for the networking calls, to allow various networking APIs to be substituted.

18 Mar 2014

I looked at the C++ and Java API and both look nice. Could one API serve as blocking and non-blocking depending if a callback is provided or not? There may be some details under the hood that make this difficult but from the perspective of user experience and coherency, I think it could be quite nice!

Typically mbed callbacks have been a single object rather than a class that needs to be inherited and overridden but those are details. Here are some examples and the official SDK implementation...

19 Mar 2014

Hi,

I'd like to come up with a general blocking/non-blocking API style we can use throughout. The default I think would be:

ret func(args);
void func(args, callback(ret));

Where callback is a pointer to a function, or pointer to class instance method (or lambda/closure if we moved to something like C++11)

The alternative as you say is a callback class you overwrite, but I'm less convinced about that; when you have situations with two natural call backs, simple static functions, lambdas, it becomes cumbersome IMO - it is all a bit Java :)

Simon

19 Mar 2014

There are two types of callbacks:

  • those that are initiated by the application - those that indicate the sucess or failure of a method call, like subscribe
  • those that indicate asynchronous events which can happen at any time - the arrival of a message or breaking of a connection (connectionLost).

The second type of callback is always needed (unless you have another way of signalling asynchronous events) whereas the first type could be optional. The first type is generally associated with a method call, so the function pointer can be a parameter to that method. The second type has no method call to be associated with, and so we have up to now associated that with the client object.

21 Mar 2014

Also now member of the team :)

I really assume the synchronous model is easier to use, and also results in easier to understand code, instead of a string of callbacks. What I then wonder, is requiring RTOS an option? If you can then just make a thread where the MQTT is running in, then it is first of all clear, easy to understand, code, etc, and it doesn't have to block your entire device. There would be no reason to have a non-blocking version of connect for example, since connecting is simply the first task of the MQTT thread, and that need to block before anything else can happen.

For actually publishing packets, I don't know how that goes exactly, but I can imagine it is nice to have the option to start second a packet before the previous one is completely handled.

21 Mar 2014

It would be nice to have unique callback paths for each topic you subscribe to:

fnptrA = subscribe("topicA", callbackFnA(buffer,buffer_length))

fnptrB = subscribe("topicB", callbackFnB(buffer,buffer_length))

Also, I think that the callback function should be capable of using a supplied buffer (for storing the payload) that is given via its registration (above).

It would be cool to have the ability to register multiple callbacks with a given topic:

fnptrA1 = subscribe("topicA", callbackFnA1(buffer,buffer_length))

fnptrA2 =subscribe("topicA", callbackFnA2(buffer,buffer_length)) ...

Adding and removing registered callback pointers would also be nice: deregisterCallback(fnptrA); ...

Additionally, the current MQTT library has issues when trying to send more than the 128byte default payload limit (and I believe there are issues with the code if you try to change that default define... to accept larger limits...). It would be great to support the full payload length that MQTT (as a specification) supports (or at least make it configurable and expandable if needed).

The current MQTT library does not support wildcards in the topic name - this is a really cool feature in MQTT in that you can address whole sets of endpoints with wildcards. Wildcard support typically comes into play during the subscription:

fnptrA = subscribe("topic/tree/#", callbackFnA(buffer,buffer_length));

Lastly, it would be great to have the library support MQTT wills.

24 Mar 2014

Erik,

using a background thread is definitely what I would propose, unless a single-threaded application was essential (that is not the case here it seems). When I talk about blocking or non-blocking, this refers to the main application thread, not the background task. The application needs to know when an operation (connect, subscribe, publish) has finished and worked, to be able to know that subsequent operations will work predicatbly. A blocking API will let the application know by blocking until it can return the status of the operation. A non-blocking API will return the result in a callback some time later, allowing the application to continue running in the meantime.

24 Mar 2014

Doug,

the problem with having a callback per subscription (or subscription objects) is that MQTT does not tell you which subscription was used to match a publication when you have subscriptions with wildcards. This is why MQTT client libraries do not in general do this. To make this work you have to have subscription matching code in the client library, which has two problems 1) increasing the size of the client library and 2) second guessing the algorithm used in the server.

You mention the "current MQTT library" not supporting messages > 128 bytes, will messages and topic wildcards. Which library is that? The base library I have written does support those things, and any follow on library will support all MQTT features.

24 Mar 2014

@Ian, If you use a background thread, then having it blocking would make very little sense. Then the entire idea of a background thread is ignored.

I can imagine using RTOS function you can already do some signalling between application/MQTT thread. But also for example might it not be just some kind of function the main thread can call if MQTT is connected?

24 Mar 2014

Erik,

you need a background thread to receive publish messages arriving from the server asynchronously and to keep the session alive, regardless of whether you have a blocking call to connect/subscribe etc. Otherwise you have to make a call to the API at least within the keepalive interval to keep the session active. This is why there are 3 types of C MQTT APi in Paho (http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.c.git/):

  1. blocking single-threaded (the application must call yield or other API call within the keepalive interval)
  2. blocking multi-threaded, and
  3. non-blocking multi-threaded.

If each of the calls accepts a callback but blocks if no callback is provided, then we can cater for cases 2 and 3 in one API.

24 Mar 2014

The library I refer to is the one that originated from an arduino port (I believe its called PubSubClient) - the code has a constant (MQTT_MAX_PACKET_SIZE) that, if changed, does not seem to work entirely right. For example, if I change it from 128 bytes to 512 bytes and then try to send a 512 byte MQTT payload, things hang. I've got 3 different programs that tried to do this and none of them work with the increased payload constant - they all lock up when trying to process larger payloads. Looking down into PubSubClient.cpp, in addition to the constant above, there are also references to "128" (literal constant) that I am unsure of (but suspect) they have to do with the MQTT_MAX_PACKET_SIZE constant in some way. When I change MQTT_MAX_PACKET_SIZE, those additional literals do not get changed (hence my theory for the lock up). I've tried twiddling with those literals with some marginal success (though not completely...)

Regarding the callback suggestions, I believe you should be able to pull the endpoint target from the wildcard during the receive if I am not mistaken - this would give you insight into which callback you should invoke. If it could be done, I think it would make for some really slick/cool processing of events generated by MQTT.

Doug

25 Mar 2014

Doug,

I don't intend to touch or fix that particular library. We're planning a new one which will work for the full packet size.

"I believe you should be able to pull the endpoint target from the wildcard during the receive if I am not mistaken" The receive gets the specific topic that the message was sent to, not the wildcard subscription that matched. To work out which subscription matched, the client library has to work that out itself, by reimplementing code that exists and has been executed in the server.

The MQTT spec also allows servers to send separate messages for each matching subscription for a single message sent to the server (if several wildcard subscriptions match). In this case a single incoming message could result in multiple messages to the client, and then multiple subscription object callbacks for each of those incoming messages.

I didn't really want to have to reimplement subscription matching logic in the client. But we could go that route if we are prepared also to explain the consquences of multiple wildcard subscriptions which intersect with each other. You might end up "receiving" many more messages than you intended. This may be an edge case which rarely occurs, but I can envisage having to explain it several times.

25 Mar 2014

It would be nice to have unique callback paths for each topic you subscribe to:

fnptrA = subscribe("topicA", callbackFnA(buffer,buffer_length))

fnptrB = subscribe("topicB", callbackFnB(buffer,buffer_length))

That works easily for non-wildcard topics. For wildcards, topic matching code will be needed. You would also need success/failure callbacks.

Also, I think that the callback function should be capable of using a supplied buffer (for storing the payload) that is given via its registration (above).

That is not a problem, except for the question of what action the client library should take if a message is received which exceeds the buffer length. Throw an exception and disconnect the client is one option. There would be more parameters to the callback than just the payload - retained flag, maybe QoS, maybe duplicate, maybe message id - a message class.

It would be cool to have the ability to register multiple callbacks with a given topic:

fnptrA1 = subscribe("topicA", callbackFnA1(buffer,buffer_length))

fnptrA2 =subscribe("topicA", callbackFnA2(buffer,buffer_length)) ...

Adding and removing registered callback pointers would also be nice: deregisterCallback(fnptrA); ...

Roughly equivalent to MQTT unsubscribe.

Additionally, the current MQTT library has issues when trying to send more than the 128byte default payload limit (and I believe there are issues with the code if you try to change that default define... to accept larger limits...). It would be great to support the full payload length that MQTT (as a specification) supports (or at least make it configurable and expandable if needed).

Not a problem supporting the full payload length.

The current MQTT library does not support wildcards in the topic name - this is a really cool feature in MQTT in that you can address whole sets of endpoints with wildcards. Wildcard support typically comes into play during the subscription:

fnptrA = subscribe("topic/tree/#", callbackFnA(buffer,buffer_length));

Not a problem except for the extra work involved in matching incoming messages to subscriptions. Not sure why you think the current MQTT library does not support wildcards though, I didn't see anything specific in the code.

Lastly, it would be great to have the library support MQTT wills.

Again, no problem.

26 Mar 2014

How about something like this?

class MQTTConnection;

class MQTTResult
{
	/* success or failure result data */
};

class MQTTMessage
{
	int qos;
	int retained;
	int dup;
	int msgid;
    void *payload;
    size_t payloadlen;
};
  
FP<void, char* topic, MQTTMessage*>  messageHandler;

FP<void, MQTTConnection*, MQTTResult*> resultHandler;

class MQTTConnection 
{
    
public:    
    
    MQTTConnection();
    
    int connect(MQTTPacket_connectData connectOptions options, resultHandler rh); // or start?
    
    int publish(char* topic, MQTTMessage* message, resultHandler rh);  //  or send?
    
    int subscribe(char* topic_filter, int qos, messageHandler mh, resultHandler rh);
    
    int unsubscribe(char* topic_filter, resultHandler rh);
    
    int disconnect(int timeout, resultHandler rh);  // or stop?     
    
};
30 Apr 2014

What about power efficiency and multiple connections. Why not focus on creation of some solution for WIZnet hardwire TCP/IP modules such as W5500? Why you try to get something from software when there are reliable hardware there already? So you will offload the MCU to do other task or even sleep or deep sleep.

30 Apr 2014

MQTT (mqtt.org) is a very lightweight messaging protocol which sits above TCP/IP. Its intention is to make it easy to distribute data in an event-driven manner between applications. It's easier than socket programming.

This API can be easily used on any TCP stack. MQTT is power efficient because it is lightweight. If you want multiple connections, you can use multiple MQTT Client objects. See the API example at: http://mbed.org/teams/mqtt/code/HelloMQTT/

30 Apr 2014

Hi Ian,

Do you have any comparison between other alternatives? Where your messaging protocol is better comparing to a custom solution over sockets? How many resource it will take and how easy is to profile the resource requirement for that extra layer which you creating?

thanks, Stas

01 May 2014

Hi Stas,

the best place to start is mqtt.org. There's an FAQ page and lots of other pointers to helpful information. The docs page has a list of some projects that use MQTT. The software page has a list of client libraries and servers.

A good way to find out how easy it is to distribute information with MQTT is to try it for yourself. The Mosquitto open source server (http://mosquitto.org/) is one of the most widely used. Example MQTT programs for mbed are here: http://mbed.org/teams/mqtt/code/MQTTPacket/file/1b8fb13fc6ef/samples/simple-publish.txt, and here: http://mbed.org/teams/mqtt/code/HelloMQTT/file/638c854c0695/main.cpp.

Ian

01 May 2014
01 May 2014

Hi Ian,

Thanks for info. Seems to be the same principle as Bluetooth LE protocol. But can you get the notification without connection? For instance in BLE you have a NON-Connectable Advertising. Still interesting to see the resource requirement for that layer.

Thanks, Stas

02 May 2014

In pure MQTT you can't get notification without a TCP connection. MQTT-SN, a variant for connectionless transports like UDP, does allow multicast, but in pure MQTT you have to establish a connection, or use another method.

09 May 2014

Talk about complicating a single board WAY TOO MUCH...

10 May 2014

A good API simplifies the job at hand - in this case getting messages from one application to another, reliably, using various underlying transports - and makes it easier extend that small project into a bigger one. You can also use the simpler MQTTPacket API if you like :-)

07 Jan 2015

hi,Ian, I have 2 questions: 1.MQTTClient.h > SUBACK return value is a list, so grantedQoS should be an Int array,we can also subscribe a list of topic instead of just one topic?

if (waitfor(SUBACK, timer) == SUBACK) wait for suback { int count = 0, grantedQoS = -1; unsigned short mypacketid; if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)

2. MQTTAsync.h > Mbed Thread constructor is not that format.

start background thread this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);

Thank you

07 Jan 2015

hi,Ian, I have 2 questions: 1.MQTTClient.h > SUBACK return value is a list, so grantedQoS should be an Int array,we can also subscribe a list of topic instead of just one topic?

if (waitfor(SUBACK, timer) == SUBACK) wait for suback { int count = 0, grantedQoS = -1; unsigned short mypacketid; if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)

2. MQTTAsync.h > Mbed Thread constructor is not that format.

start background thread this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);

Thank you

07 Jan 2015

hi,Ian, I have 2 questions: 1.MQTTClient.h > SUBACK return value is a list, so grantedQoS should be an Int array,we can also subscribe a list of topic instead of just one topic?

if (waitfor(SUBACK, timer) == SUBACK) wait for suback { int count = 0, grantedQoS = -1; unsigned short mypacketid; if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)

2. MQTTAsync.h > Mbed Thread constructor is not that format.

start background thread this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);

Thank you

07 Jan 2015

hi,Ian, I have 2 questions: 1.MQTTClient.h > SUBACK return value is a list, so grantedQoS should be an Int array,we can also subscribe a list of topic instead of just one topic?

if (waitfor(SUBACK, timer) == SUBACK) wait for suback { int count = 0, grantedQoS = -1; unsigned short mypacketid; if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)

2. MQTTAsync.h > Mbed Thread constructor is not that format.

start background thread this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);

Thank you

07 Jan 2015

hi,Ian, I have 2 questions: 1.MQTTClient.h > SUBACK return value is a list, so grantedQoS should be an Int array,we can also subscribe a list of topic instead of just one topic?

if (waitfor(SUBACK, timer) == SUBACK) wait for suback { int count = 0, grantedQoS = -1; unsigned short mypacketid; if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)

2. MQTTAsync.h > Mbed Thread constructor is not that format.

start background thread this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);

Thank you

07 Jan 2015

hi,Ian, I have 2 questions: 1.MQTTClient.h > SUBACK return value is a list, so grantedQoS should be an Int array,we can also subscribe a list of topic instead of just one topic?

if (waitfor(SUBACK, timer) == SUBACK) wait for suback { int count = 0, grantedQoS = -1; unsigned short mypacketid; if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)

2. MQTTAsync.h > Mbed Thread constructor is not that format.

start background thread this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);

Thank you

07 Jan 2015

HI Terrence,

1) the MQTTClient API is intended to be for simple use, so the subscribe call accepts only one topic, and correspondingly returns one granted QoS. We could add a separate subscribe call which accepts multiple topics, and returns a list of granted QoS. Most applications do not use multiple topics in a subscribe call, which is why I viewed it as low priority.

2) The MQTTAsync API is still under development and not ready to be used, so I should remove that for now.

Ian

Please log in to post a reply.