An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSocket.h Source File

MQTTSocket.h

00001 #if !defined(MQTTSOCKET_H)
00002 #define MQTTSOCKET_H
00003 
00004 #include "MQTTmbed.h"
00005 #include <EthernetInterface.h>
00006 #include <Timer.h>
00007 
00008 class MQTTSocket
00009 {
00010 public:
00011     MQTTSocket(EthernetInterface *anet)
00012     {
00013         net = anet;
00014         open = false;
00015     }
00016     
00017     int connect(char* hostname, int port, int timeout=1000)
00018     {
00019         if (open)
00020             disconnect();
00021         nsapi_error_t rc = mysock.open(net);
00022         open = true;
00023         mysock.set_blocking(true);
00024         mysock.set_timeout((unsigned int)timeout);  
00025         rc = mysock.connect(hostname, port);
00026         mysock.set_blocking(false);  // blocking timeouts seem not to work
00027         return rc;
00028     }
00029 
00030     // common read/write routine, avoiding blocking timeouts
00031     int common(unsigned char* buffer, int len, int timeout, bool read)
00032     {
00033         timer.start();
00034         mysock.set_blocking(false); // blocking timeouts seem not to work
00035         int bytes = 0;
00036         bool first = true;
00037         do 
00038         {
00039             if (first)
00040                 first = false;
00041             else
00042                 wait_ms(timeout < 100 ? timeout : 100);
00043             int rc;
00044             if (read)
00045                 rc = mysock.recv((char*)buffer, len);
00046             else
00047                 rc = mysock.send((char*)buffer, len);
00048             if (rc < 0)
00049             {
00050                 if (rc != NSAPI_ERROR_WOULD_BLOCK)
00051                 {
00052                     bytes = -1;
00053                     break;
00054                 }
00055             } 
00056             else
00057                 bytes += rc;
00058         }
00059         while (bytes < len && timer.read_ms() < timeout);
00060         timer.stop();
00061         return bytes;
00062     }
00063 
00064     /* returns the number of bytes read, which could be 0.
00065        -1 if there was an error on the socket
00066     */
00067     int read(unsigned char* buffer, int len, int timeout)
00068     {
00069         return common(buffer, len, timeout, true);
00070     }
00071 
00072     int write(unsigned char* buffer, int len, int timeout)
00073     {
00074         return common(buffer, len, timeout, false);
00075     }
00076 
00077     int disconnect()
00078     {
00079         open = false;
00080         return mysock.close();
00081     }
00082 
00083     /*bool is_connected()
00084     {
00085         return mysock.is_connected();
00086     }*/
00087 
00088 private:
00089 
00090     bool open;
00091     TCPSocket mysock;
00092     EthernetInterface *net;
00093     Timer timer;
00094 
00095 };
00096 
00097 #endif