In This tutorial we will look in more detail on how to connect , publish and subscribe using the MQTT v 3.1.1 client synchronous client using a single thread.
We will not be using callbacks. If you use callbacks then second thread is started automatically to process the callbacks.
If you don’t declare callbacks then you need to manually check for received messages using the MQTTClient_receive function.
This function needs to be called often, and is often found in an infinite loop.
Basic Code Structure
In the previous tutorial MQTT Client Basics we looked at the general code structure:
The Basic code structure used is:
- Create a client object
- Set the options to connect to an MQTT server
- Set up callback functions if multi-threaded
- Subscribe to any topics.
- Publish messages
- Handle any incoming messages
- Disconnect the client
- Free any memory being used by the client
Now will look at an example client and the code.-
The example code simulates a simple sensor that publishes data at regular intervals (10 secs) on topic MQTT-OUT and can also receive data (commands) on the topic MQTT-IN.
At the top of the code we create the client and then initialise the connection options, and the publish message structures.
The delivery token is actually the msgid that is present on messages published with QOS of 1 or 2.
MQTTClient Client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token;
Because we are using a single thread we need to call MQTTClient_receive function at regular intervals.
This is a blocking call so we set the time out to be short (0.01) so as to not cause a delay in the loop.
We use the time function with a flag to determine when to call the publish function (every 10 seconds) . The code is below
while (1) { rc=MQTTClient_receive(Client,&topicName,&topiclen,&message,0.01); if(topicName){ printf("Message received on topic %s is %.*s.\n", topicName, message->payloadlen, (char*)(message->payload)); } seconds=time(NULL); if(seconds%10==0 && pub_flag==0) { printf("publishing"); pub_flag=1; //don't want to publish too many messages pub_msg(); } if(seconds%10==1&& pub_flag==1) { pub_flag=0; //reset ; }
So as to avoid having to pass parameters into the pub_msg function I declare most of the variables outside of the main function.
The code will publish messages and receive messages until stopped using CTRL+C and the complete code is shown below:
#include #include #include #include "MQTTClient.h" #include #include #define ADDRESS "tcp://localhost:1884" #define CLIENTID "ExampleClientPub" #define TOPIC_IN "MQTT-IN" #define TOPIC_OUT "MQTT-OUT" #define PAYLOAD "testing" #define QOS 1 #define TIMEOUT 10000L time_t seconds; MQTTClient Client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; void pub_msg() { pubmsg.payload=PAYLOAD; pubmsg.payloadlen = (int)strlen(pubmsg.payload); pubmsg.qos = QOS; pubmsg.retained = 0; int rc; MQTTClient_publishMessage(Client, TOPIC_OUT, &pubmsg, &token); printf("Waiting here for up to %d seconds for publication of %s\n" "on topic %s for client with ClientID: %s\n", (int)(TIMEOUT/1000), PAYLOAD, TOPIC_OUT, CLIENTID); rc = MQTTClient_waitForCompletion(Client, token, TIMEOUT); printf("Message with delivery token %d delivered\n", token); } int main(int argc, char* argv[]) { int rc; rc=MQTTClient_create(&Client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); char * topicName=NULL; int topiclen; MQTTClient_message *message=NULL; conn_opts.keepAliveInterval = 120; conn_opts.cleansession = 1; if ((rc = MQTTClient_connect(Client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } rc=MQTTClient_subscribe(Client,TOPIC_IN,QOS); printf("Subscribed %d \n",rc); int pub_flag=0; while (1) { rc=MQTTClient_receive(Client,&topicName,&topiclen,&message,0.01); if(topicName){ printf("Message received on topic %s is %.*s.\n", topicName, message->payloadlen, (char*)(message->payload)); } seconds=time(NULL); //sleep(5); if(seconds%10==0 && pub_flag==0) { printf("publishing"); pub_flag=1; //don't want to publish too many messages pub_msg(); } if(seconds%10==1&& pub_flag==1) { pub_flag=0; //reset ; } } printf("end Sleeping \n"); //sleep(100); MQTTClient_disconnect(Client, 10000); MQTTClient_destroy(&Client); return rc; }
Related Tutorials and resources: