To receive messages on a topic you will need to subscribe to the topic or topics.
To subscribe to a topic you use the subscribe method of the Paho MQTT Class object.
In this tutorial we will look at some examples of using the subscribe method.
The diagram below illustrates the subscribe message flow.
The subscribe method accepts 2 parameters – A topic or topics and a QOS (quality of Service) as shown below with their default values.
The documentation lists three ways of calling the subscribe method.
Method 1- Uses a single topic string This is an example function call.
Method 2- Uses single tuple for topic and QOS -(topic,qos)
Method 3- Used for subscribing to multiple topics and uses a list of tuples [(topic1,qos),(topic2,qos),(topic3,qos)]
The subscribe function returns a tuple to indicate success, and also a message id which is used as a tracking code.
Subscribe Acknowledgements
The MQTT broker/server will acknowledge subscriptions which will then generate an on_subscribe callback.
The format of the callback function is shown below:
The mid value can be compared with the one returned by the function call to check for successful subscription requests.
MQTT Subscribe Success and Failure Examples:
We will use a simple Python subscribe script to show:
- Successful subscription
- Failed subscription raising a value error -To make this fail I set the QOS to 5 which is invalid
- Failed subscription with an MQTT_ERR_NO_CONN –To make this fail I didn’t connect to the broker before I tried to subscribe.
Notes:
In the screen shot above you can see in the successful subscribe that the mid (message ID) values that are printed match.
This shows that the subscribe was successful.
You would use the mid values when checking on multiple subscriptions as some might fail and others succeed
Also note the result value of 0 which is a success code (MQTT_ERR_SUCCESS)
The last example shows a fail due to a connection error the mid value is None. The (MQTT_ERR_NO_CONN) value is 4 which indicates a connection failure.
The table below taken from the client source show the other error codes and their meaning:
Subscribing to Multiple MQTT Topics
In this example we will look at the different ways of subscribing to multiple MQTT topics.
You can subscribe by:
-
- Calling the subscribe function for each topic.
- Calling the subscribe function with a topic list
The screen shot below illustrates both methods
Firstly we use single topic subscriptions to subscribe to 2 topics. Notice that two messages are sent and we would need to track two message ids.
In the second example we subscribe to 3 topics using a list. Notice that a single subscribe message is sent, and results in a single acknowledgement.
Notice also how the message ids match.
Setting The Quality of Service
If you want to try and ensure that the subscriber gets a message then you need to subscribe with a quality of service of 1 or 2. See
However even this doesn’t guarantee that they will receive the message, as you also need to have subscribed with a persistent connection (not the default). See Clean Sessions and QOS (quality of service) -Example.
When you subscribe the suback message will tell you what QOS the broker has assigned to the subscription.
Although it could be downgraded it is usually the same as what you set in the subscription request.
Subscribe Wait Loops
In my simple scripts demo scripts I don’t process the callbacks to check for successful subscriptions, but in real scripts you will need to.
Generally it will make no sense proceeding unless you have successfully subscribed.
Therefore I tend to use the following format:
- Subscribe
- Wait for success or quit
- Proceed
Here is a snippet of code from my wait for function which I use to wait for connections and subscriptions
def wait_for(client,msgType,period=0.25):
if msgType=="SUBACK":
if client.on_subscribe:
while not client.suback_flag:
logging.info("waiting suback")
client.loop() #check for messages
time.sleep(period)
Clean Session Flag and Remembering Subscriptions
If you connect using the clean session flag set to False then the next time you connect you won’t need to subscribe as the broker remembers your subscriptions.
You can use the unsubscribe method to cancel existing subscriptions.
Subscribing to Restricted Topics
Brokers like the Mosquitto broker allow you to control access to topics.
This means that some topics may be restricted, and you are unable to subscribe to them.
Unfortunately the client doesn’t receive any indication of this, and so it appears that the subscription has been successful but it has not. See Using the ACL to restrict topic access on the Mosquitto broker.
The only way to be sure that a subscribe has really succeeded is to receive messages on that topic.
Effects of Retained Messages
When you subscribe to a topic that has had messages published with the retain flag set then you will receive any retained messages.
The client can detect that is a retained messages by examining the message.retain flag in the on_message callback as shown in the code snippet below:
def on_message(client, userdata, message): print("message received ",str(message.payload.decode("utf-8")),\ "topic",message.topic,"retained ",message.retain) if message.retain==1: print("This is a retained message")
– See Understanding Retained messages
Unsubscribing
If you no longer need to receive messages on a topic you can unsubscribe from that topic using the unsubscribe method.
The method only accepts 1 argument the topic.
The call returns a return code and mid value like the subscribe method and is acknowledged by a UnSubAck response which triggers the on_unsubscribe callback function.
Again you can compare the mid values in the response code and on_unsubscribe callback to check for success or failure.
Subscribe Suggestions
- Use a try block on the subscribe call.
- Check the return code result value for a success
- Check that all subscriptions have been successful by examining the mid value in the callback.
- Use a wait loop to wait for successful subscriptions before proceeding.
- Consider publishing to the same topic to verify subscription.
Video
If you prefer a videos I’ve created a YouTube video that covers the subscribe process that you may enjoy watching.
There is another video that covers subscribe acknowledgements
Next—>MQTT Publish-Python Client Example
Common Questions and Answers
Q- I have subscribed to a topic when using a persistent connection. I’ve been disconnected for a few hours what happens when I re-connect?
A– When you reconnect you will see all messages that have been published on that topic while you were disconnected provided the messages were published with a QOS of 1 or 2 and that you originally subscribed with a QOS of 1 or 2.
Q- How do I subscribe to all topics?
A- Subscribe to #
Course Links
- Introduction to the Paho Python MQTT Client
- Introduction to the Client Class
- Connecting to a Broker
- Publishing Using The Paho Python MQTT Client
- –Subscribing using The Paho Python Client
- Receiving Messages with the Paho MQTT Python Client
- Understanding The Loop
- Understanding Callbacks
- Handling Multiple Client Connections
Related tutorials and Resources
- Understanding MQTT Topics
- MQTT Publish and Subscribe for Beginners
- Video –Checking MQTT Subscribe Acknowledgements
UPDATE of my previous question
Hello Steve,
am I right with MQTT when I want to do the following?
My system needs to tell to specific machines what they have to do, after evaluation of the situation minute by minute. I need a kind of asynchronous communication to the machines with each one getting their own instruction (machine A goto location B, machine C goto location D, …) .
Can I use for example a topic string where I integrate a unique identifier for the machine?
Code running on machine A:
UNIQUE_IDENTIFIER_A = “AF18CD22BE87” #Unique identifier for the machine A
client.subscribe(UNIQUE_IDENTIFIER_A+”/instruction”,1)
Code running on machine C:
UNIQUE_IDENTIFIER_C = “DA88FA92EC00” #Unique identifier for the machine C
client.subscribe(UNIQUE_IDENTIFIER_C+”/instruction”,1)
Or can I somehow use the client_name or client_id? Or do you think that it is not an MQTT topic?
Best regards,
Marcus
A client identifier will work just as well. Take a read of this tutorial as it goes into more detail on topic design.
http://www.steves-internet-guide.com/mqtt-topic-payload-design-notes/
rgds
steve
Hello Steve,
am I right with MQTT when I want to do the following?
My system needs to tell to specific machines what they have to do, after evaluation of the situation minute by minute. I need a kind of asynchronous communication to the machines with each one getting their own instruction (machine A goto location B, machine C goto location D, …) .
Can I use for example a topic where I integrate a unique identifier for the machine?
Code running on machine A:
client.subscribe(“/instruction”,1)
Code running on machine C:
client.subscribe(“/instruction”,1)
Or can I somehow use the client_name or client_id?
Or do you think that it is not an MQTT topic?
Best regards,
Marcus
I would create a topic strucure like
topic_base/machine1/commands
topic_base/machine1/responses
and the same for each machine.
See this tutorial for an example of use.
http://www.steves-internet-guide.com/controlling-devices-mqtt-python/
rgds
steve
Hi Steve,
thank you. I will have a look to the link.
Sorry, I posted the question twice because there was information lost due to angle brackets . But you can ignore the second one.
Best regards,
Marcus
I have a question about the subscribe. Messages are not always received and therefore nothing happens. However, the message is received when I look at the topic in MQTT explore, not 100% of the time of messages being published are received but far more often than messages received by the python code. Does someone have an idea what could cause this?
The code is the following:
import json
import paho.mqtt.client as mqtt
from gpiozero import LED
broker = “localhost”
port = 1883
waterPumpState = “off”
topic = “camper/actuators/waterpump/”
led_4 = LED(4)
# The callback for when the client connects to the broker
def on_connect(client, userdata, flags, rc):
if rc == 0:
print(“connected OK Returned code=”, str(rc))
pass
else:
print(“Bad connection Returned code=”, str(rc))
client.subscribe(topic + “in”, 0)
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, message):
jsonString = message.payload.decode(‘utf-8’)
waterPump = json.loads(jsonString)
waterPumpState = waterPump[“state”]
if waterPumpState == “on” and not led_4.is_lit:
print(“on”)
led_4.on()
elif waterPumpState == “off” and led_4.is_lit:
print(“off”)
led_4.off()
# Create instance of client with client ID “digi_mqtt_test”
client = mqtt.Client(“mosquitto”)
client.on_connect = on_connect # Define callback function for successful connection
client.on_message = on_message # Define callback function for receipt of a message
client.connect(broker, port)
client.loop_forever() # Start networking daemon
The code looks ok were are the messages coming from and how do you know some are missing? Is there a counter?
rgds
Steve
Steve thanks for helping out!
The problem turned out that the clientId’s were not unique. By fixing this the subscriber was not reconnecting constantly which resolved in solving the message sending en receiving problem.
Hello, thank you for this very interesting tutorial. I would like to subscribe to a couple of topics “by book”, and I don’t know exactly if this is the right way to do it:
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.connected_flag = True
print(‘Yee!’)
else:
client.bad_connection_flag = True
print(‘Err: ‘,rc)
try:
(result, mid) = client.subscribe(topic_wifi, qos)
while result != 0:
print(‘… …’)
(result, mid) = client.subscribe(topic_wifi, qos)
except:
print(‘NC…’)
exit(-3)
So, I created a try-except block within my on_connect callback because otherwise, I don’t know why, the client is not able to subscribe to any topic. Moreover, I think that when the restul == 0, then, the connection is stable, otherwise we could try to resubscribe to that topic, but I don’t know if it is good, because a non-zero result could be a sign of wrong parameters, thus, it is pointless to try to resubscribe. Last but not least, I would like to test the mid values, but what should I do if those values are not the same? To try to resubscribe, or to abort the execution of the script?
Thank you.
The better way to do it is to process the suback which returns MID which you compare to the mid that was returned bu the subscribe.
I did have some code somewhere and I will try to locate it if you are stuck.
Rgds
Steve
Is it possible to subscribe to the same topic from different processes?
Example:
process1 subscribes to topic
process2 subscribes to same topic
Question -> The Callbacks for this topic are called in both processes?
Not sure what you mean by different processes? You can use a script with multiple clients that subscribe to the same topic. Is that what you mean?
Rgds
Steve
I have established a persistent connection between publish and subscribe, QoS = 1, but the subscriber speed is very slow, and there will be data loss
this is subscribe:
client = mqtt.Client(‘client_sub’, clean_session=False)
client.subscribe(‘/test/#’, qos=1)
and publish:
client = mqtt.Client(‘client_pub’, clean_session=False)
client.publish(‘/test/test01’, json.dumps(param), qos=1)
The data size is 280kb
Not sure what you mean by subscribe speed being slow
Rgds
Steve
That is,Publishers can quickly publish 10,000 pieces of data, but it takes a long time for subscribers to receive them, and data can be lost.
If you publish and subscribe with a qos of 1 then you should be ok. Are you using your own broker or a public broker?
Hi Steve,
All your tutorials are great. it always helped me to understand the concepts.
I have a couple of doubts.
1) There is a message broker running on rabitMQ with port as 8883 (requires TLS I suppose). Some publishers are sending some messages in every 15s to a particular topic. I have set up a client as a subscriber to this topic to receive the messages.
I was not able to establish connection when I set up the subscriber with port 8883, but the connection was successful when I set up the subscriber with port 1883 ( even though the broker instance port is 8883 ).
2) Even though, I was able to establish the connection when I set up the subscriber with port 1883. I am not receiving all the messages. I would like to know if it’s because I am connecting via 1883 instead of 8883. I also tried subscribing to the topics with QoS = 1,2 etc. But it does not matter and messages are still being missed.
Kindly let me know if I am missing out anything. Any wild guess what the problem could be ?
The port the messages are sent to make no difference it is the topic that is important.
If you aren’t seeing messages then maybe there a restrictions on the topic. Are you sure the topic is available?
First of all thanks steve for wonderful course.
I need a little help how can I subscribe to new topics in a running envirment. What is the best practice to do so. thanks
You just need a simple check in the loop. I would drop new subscriptions into an array or queue and then check the array or queue periodically.
Does that make sense
Rgds
Steve
Hi Steve, I do have a few unclear questions that I’d be extremely grateful if you were able to answer!
When I started getting interested in mqtt, I stumbled upon a simple mqtt format, where using the terminal, I was able to subscribe to a topic and then opening up to another terminal on Rpi was able to send a message and see the reply on the subscription. It was made using the Mqtt Mosquitto broker. (It was a simple set of code, i.e mosquitto_sub -h localhost -t “test” )
Now using Paho, I followed your tutorials and if I understand correctly, the client.subscribe and client.publish are on the same script and after a successful run, I get the value on the same terminal, and I was wondering why does this happen? Is there any way of knowing that the message was successfully sent and received? If it was received, is there any way of knowing?
Secondly, I read about the broker and using the local one, currently I’m using my IP address as host address and was wondering, is it better to use an online broker or use a local broker? How hard is it to create a local broker as my future step is to allow the messages published to be on a webpage, and possibly on some cloud? I’m currently seeing what my limitations. Are those two things interconnected and thank you for your time!!
Hi
It is easier to use you own local broker as you can see the console .
Regarding publish and subscribe there is no way of knowing that the subscriber has received the message unless he sends a reply.
This tutorial may help
http://www.steves-internet-guide.com/two-way-communication-mqtt-python/
Hi Steve
I am working on home automation project. I am using Onion Omega2s+ (I had made custom PCB). So on board i have two network interface (WIFI and Ethernet).
I am using python and paho mqtt package.
When interface used to change from WIFI to Ethernet or vice versa, I am facing problem in subscribe part. Subscribe not working.
Can you help me please.
Regards
Suman
Do you have access to the mqtt broker console. If so can you see the subscribe. I assume you can publish how do you know that do you see it at the broker. Is it the same with Ethernet and wi-fi
Rgds
Steve
Hi Steve, thank you for probably the most detailed and easy to understand mqtt tutorial on the interent. Regarding the on_wait function could you perhaps give an example of how you would use it. Should I pass anything to the function as I currently can’t get it to work? Should the on_wait function be put between the loop start and stop?
The on_wait function is just to make sure that you are connected or subscribed before you proceed. I often use a simpler format and I’ll edit the tutorial to include it.
It relies on a flag which you det in the on_connect callback. I call it connected_flag and set to false
so in the main code I use:
while not client.connected_flag: #wait in loop until connected
print(“In wait loop”)
time.sleep(1)
then you can go on and subscribe or publish
rgds
Steve
Hi
I have subscribed to multiple topics QoS 0 and the number of data I am receiving from the topics are different. The 5 publishers are sending 80 readings in one second each on different topics. But the subscriber is able to receive roughly 80 readings from just 1 or 2 topics while the other left topic has only 50% of the data.
Can this be improved or need to shift to some other protocol for this application?
Hi
Is this a total of 400 messages/sec?
Are you in control of the publishers?
Rgds
Steve
Hi, I have two scripts : one to publish messages on a topic and the other one to subscribe to this topic and show the message. Can you tell me how to print this message ? Do I need to use the on_message function?
Yes
There is example code here
http://www.steves-internet-guide.com/python-mqtt-publish-subscribe/
rgds
Steve
Having subscribed to multiple topics, I don’t quite see how I can ascertain which client was publishing the data. For example, I have two clients, each publishing a list of temperatures. How do I know which published the message my subscriber is seeing?
Thanks!
Hi
You don’t know which client sent the messages. That is the nature of MQTT.
If you needed to know that then you would need to identify the sending client in the payload.
Does that make sense?
rgds
Steve
I want to know who published the meassage.
If a different client publishing messages, At subscriber side I should know who is publishing messages i.e; client id and user id.
How will I come to know who is publishing messages in subscriber application?
Hi
The client_id is not sent with the published message from the broker and so you can’t know the id of the sender unless the sender includes the id in the message payload.
Rgds
Steve
Hello,
I have created subscribe and publisher scripts in python separated. Once subscribe it receives messages from publisher, if I subscribe again it receives earlier messages also. So I am trying to do => once the messages recieved, I dont want that message when I subscribe again. So can I clear that message in subscriber script once I received message. SO next time I subscribe that topic it will not receive.
I know there is reset function we can write, but in publisher.
I want to do same reset thing in subscriber. Is there any way to do this?
Hi
It is probably because you are publishing with the retain flag set. See http://www.steves-internet-guide.com/mqtt-retained-messages-example/
If you don’t use the retain flag then you shouldn’t receive the message twice.
rgds
steve
Hello, I am still trying to understand how to implement MQTT. But I would like to know if it is possible to implement a single subscribe for multiple topics, for example
(“room1 / temp”)
(“room1 / led”)
(“room2 / temp”)
(“bedroom2 / led”) ….
Hi
Yes you can use and array of tupels (topic,QOS)
[(“room1/temp”,0),(“room1/led”,0) etc]
rgds
steve
The company you talk about at the related tutorials is called “HiveMQ”, not “Hive MQTT” 🙂
Corrected I use hive and not hive-MQ The series is mqtt essential series.
Rgds
Steve
Hello Steve,
Is there any chance to somehow have different callbacks for different topics?
something like
client.subscribe(node + “switches/+/set”)
client.on_message = on_message_switches
client.subscribe(node + “pve/#”)
client.on_message = on_message_pve
I know workaround, which would be simply parse message_topic and have bunch of if topic in message_topic then do stuff, but I would like to see if there is more elegant way 😉
Thanks!
Hi
Not as far as I know you will have to do it by filtering the topic as you said.
Rgds
Steve
sweet, thanks 😉
Hi Steve! Thanks for this tutorial!
I would like to ask you: is there a way to know if my client was already subscribed to a certain topic before trying to subscribe again to it?
And if this is not possible, what happens if I try to subscribe again to a topic which I was already subscribed?
Thanks for your time!
Hi
No there isn’t but if you subscribe again it doesn’t cause a problem
Steve,
Thanks for these great tutorials. One question: If a client subscribe to one topic and later publish to the that topic. How can one discard its own published message from on_message callback and process the one published by other client. Is the client_id of the sender available in the message payload?
Currently this is not possible unless you insert some form of identifier in the message.In MQTT v5 you can do this with a subscription option.
Currently mosquitto doesn’t support MQTT v5
will anyone tell me how to add QOS 2 ….?
any sample code will be great…
Thank you.
Use
subscribe(topic, qos=2)
def On_message(client, userdata, msg):
if str(msg.payload)==”b\’start\'”:
while True:
print(“in while loop”)
if str(msg.payload)==”b\’stop\'”:
print (“stop”)
if the message start is sent then the code is stuck in while loop
i am unable to receive the message stop when the message start is sent.
Don’t run the loop in the on message callback. In the onmessgae callback use
def On_message(client, userdata, msg):
msg_received=str(msg.payload.decode(“utf-8″))
if msg_received==”Start”:
client.startflag=True
if msg_received==”Stop”:
client.startflag=False
In the main Loop use
client.startflag=True
while client.startflag:
loop code here
thanks a lot, struggling to understand that ‘b’