When writing code using the Paho Python client you would have had to use the loop() function .
In this tutorial we will look at what it does, and why it is necessary.
When new messages arrive at the Python MQTT client they are placed in a receive buffer.
The messages sit in this receive buffer waiting to be read by the client program.
You could program the client to manually read the receive buffers but this would be tedious.
The loop() function is a built in function that will read the receive and send buffers, and process any messages it finds.
On the receive side it looks at the messages, and depending on the message type, it will trigger the appropriate callback function. See Understanding callbacks.
For example if it sees a CONNACK message it triggers the on_connect() callback.
Now instead of manually reading the receive buffer you just need to process the callbacks.
Outgoing messages and message acknowledgements are placed in the send buffer.
The loop function will read this buffer and send any messages it finds.
Calling the Loop Function
See network loop in docs for function reference.
The Paho Python client provides three methods:
- loop_start()
- loop_forever() and
- loop().
The loop_start() starts a new thread, that calls the loop method at regular intervals for you. It also handles re-connects automatically.
To stop the loop use the loop_stop() method.
The loop_forever() method blocks the program, and is useful when the program must run indefinitely.
The loop_forever() function also handles automatic reconnects.
The loop can be stopped by calling loop.stop().
You should stop the loop before you exit the script.
You can also manually call the loop() method in your program.
If you do this you must remember to call it regularly.
That is it must be in a loop. e.g pseudo code below:
while.. Some code client.loop(.1) #blocks for 100ms some code
Because the loop is a blocking function I call it with a timeout the default timeout is 1 second.
If you call the loop manually then you will need to create code to handle reconnects.
Important! If your client script has more than one client connection then you must call or start a loop for each client connection.
For example, if I create two clients client 1 and client2 in a script, then you would expect to see client1.loop() and client2.loop() in the script.
Implementation Note:
I have experienced strange behaviour when starting a loop before creating a connection . So
client= mqtt.Client(cname)
client.connect(broker,port))
client.loop_start()
Works Ok but
client= mqtt.Client(cname)
client.loop_start()
client.connect(broker,port))
sometimes gives strange results.
Stopping the loop automatically
If you are using the loop_start() function then you will probably need to stop the loop automatically if the connection fails.
The easiest way of doing this is using the on_disconnect callback.
def on_disconnect(client, userdata,rc=0):
logging.debug("DisConnected result code "+str(rc))
client.loop_stop()
However you should only stop the loop if you are completely finished, and are going to exit.
Stopping the loop will stop auto reconnects unless you take steps to call the loop manually.
Loop_start vs Loop_forever
Loop_start starts a loop in another thread and lets the main thread continue if you need to do other things in the main thread then it is important that it doesn’t end.
To accomplish this you need to use your own wait loop.
The loop_forever call blocks the main thread and so it will never terminate.
The loop_forever call must be placed at the very end of the main script code as it doesn’t progress beyond it.
Handling Multiple Clients
If your script connects using multiple clients then each client will need a loop.
Therefore if you are using the loop_start() method then you will need to call it for each client connection.
Therefore if you have 2 client connections you will need two loops which equals two additional threads.
For 2 clients it isn’t really a problem, but what if you have several hundred client connections then you will need several hundred additional threads.
In this situation it is better to manually call the loop for each client and use thread pooling.
See – Handling Multiple MQTT Client Connections Using Python
However for up to around 20 client connections then is is easier to use the inbuilt loop_start and stop functions.
To make it simpler add the clients to a list and loop through the list to start the loops and the same to stop e.g
import paho.mqtt.client as mqtt clients=[] nclients=20 mqtt.Client.connected_flag=False #create clients for i in range(nclients): cname="Client"+str(i) client= mqtt.Client(cname) clients.append(client) for client in clients: client.connect(broker) client.loop_start()
Common Questions and Answers
Q- My callbacks aren’t being called. Why is that?
A- You need to start a loop or call the loop() function to process callbacks.
Q- My script has ended but the loop still appears to be running?
A- It probably is. You need to stop the loop before exiting the script.
Q- Why call the loop manually when you can use loop_start()?
A-The main reason is when the script has lots of client connections (>20) as if you use loop_start() then you need a loop for each client.
Summary
The built in loop functions will process the receive and send buffers to send and receive messages and message acknowledgements.
To make life easier use the loop_start and stop functions unless you have 10’s of client connections.
Course Links
- Introduction to the Paho Python MQTT Client
- Introduction to the Client Class
- Connecting to a Broker
- Publishing Using The Paho Python MQTT Client
- –Subscribing using The Paho Python Client
- Receiving Messages with the Paho MQTT Python Client
- Understanding The Loop
- Understanding Callbacks
- Handling Multiple Client Connections
Related Resources and Tutorials
First and foremost thanks for the great how to’s which are my regular place to find MQTT answers.
I do not see the loop.stop() which you refer to
“The loop can be stopped by calling loop.stop()”
However, loop_forever() can be stopped with client.disconnect()
Using the code below, causes one of my Cores to run at 100%. If I add a time.sleep(.1) to the while loop, the core then runs at a minimal level. I do not seem to miss messages but my test system is lightly loaded. According to the notes above, the MQTT loop is run in another thread, should this be a problem?
I could not find another way to yield time back to the system and polling for a keypress with 100% cpu time seems pointless!
client.loop_start()
while 1:
if msvcrt.kbhit():
CharIn = msvcrt.getch()
if CharIn == b’Q’ or CharIn == b’q’: # Escape Key: b’\x1b’:
print( “Quiting Now” )
client.loop_stop()
exit()
# Slow Polling to ~ 10 / second
time.sleep(.1)
Peter
Yes I need to modify the code for a slight delay in the while loop. I doesn’t affect the messages received as they are in another thread.
rgds
Steve
Hi Steve,
Do you have any pseudo code to implement multi-threading while subscribing to many number of topics via share subscription.
Subscriber will read the messages and stores on to the db. If code is a single threaded, its processing messages too slow and by the time we have the messages in db we are having a big-delay.
Thinking to have queue which will be filled by on_message and worker thread may be some 10 to read and store them to db parallel so improve processing time.
Yes the data logger does exactly that
http://www.steves-internet-guide.com/logging-mqtt-sensor-data-to-sql-database-with-python/
this logs to a file
http://www.steves-internet-guide.com/simple-python-mqtt-data-logger/
Rgds
Steve
Dear Steve,
Thanks for the post.
I have a subscriber client which basically does the below:
client = mqtt.Client()
client.tls_set(ca_certs=”src/certs/ca.crt”)
client.on_connect = on_connect
client.on_message = on_message
client.connect(“broker”, 1883, 60)
client.loop_forever()
on_connect — will subscribe to a shared-subscription on a topic structure $share/group-name/projects/#
on_message — reads the data and persists to database.
Running the client as a kubernetes workload. It works fine.
But while doing load testing I noticed a weird behaviour:
After crossing around 7000 concurrent connections to broker — Have a load-test script to send 7000 concurrent connections to broker on a specific topic to which my above client have the subscription– The client stop receiving messages. Its just sit idle even though there are messages coming on to the topic. If I spin up a new replica or have another new publisher to send messages its receiving them.
Are there any specific setting we need to tweak so that we can accommodate concurrent connections.
Any advice will be appreciated.
Nothing to tweak. I would suspect that the broker loses track of the connection mapping. See if you can get the client to unsubscribe and resubscribe when in that state and if that gets it working again.
If not try disconnect and reconnect.
Rgds
Steve
Hello Steve, thanks for all this help to everyone including me!
I have a specific issue:
I have written a main python program called ‘main.py’, that imports a part for the MQTT, with just an ‘import mqtt.py’.
The mqtt has both the messageFunction defined (code below), and at the end also the initialisation to connect and the ‘ourClient.loop_start().
I would assume the import and the loop_start are enough so that the messagefunction is called every time a message does come in, but it doesnt work like that. It only does the messageFunction once (I can see the value is updated in the SQL only once.
Although the main.py is in an indefined loop, the messageFunction is not working after the first time anymore…
Any thoughts?
code mqtt.py:
import paho.mqtt.client as mqtt # Import the MQTT library
import time # The time library is useful for delays
import datetime
import mysql.connector
mydb = mysql.connector.connect(
host=”localhost”,
user=”root”,
password=”xxxxxxx”,
database=”HeatBatt”
)
# “on message” event executes all this on every new message of the MQQT
# – First read and decode the power to and from the grid, from the MQTT message
# – Write these values (as string) into the database
#MessageFunction below is execute every time a message comes in.—————————————————————————–
def messageFunction (client, userdata, message):
#Decode MQTT message and write it to the database
global PushPower
topic = str(message.topic)
mssge = str(message.payload.decode(“utf-8”))
PowerString = str(message.payload)
positionstartCONSUME = ( PowerString.find(“svalue5”))
positionstartFEED = ( PowerString.find(“svalue6”))
positionendCONSUME = (PowerString.find(“svalue6”))
positionendFEED = (PowerString.find(“unit”))
#debug: print (“Consumption is: “, PowerString[positionstartCONSUME+12:positionendCONSUME-7])
ConsumptionString= (PowerString[positionstartCONSUME+12:positionendCONSUME-7])
#debug: print (“Feed is: “, PowerString[positionstartFEED+12:positionendFEED-7])
FeedString=PowerString[positionstartFEED+12:positionendFEED-7]
#Write the values to the database————————
mycursor = mydb.cursor()
#Write Grid Consumption in SQL
sql = “UPDATE settings SET setting_value = %s WHERE setting_name = %s”
val = (ConsumptionString, “Grid_Consumption”)
mycursor.execute(sql,val)
sql = “UPDATE settings SET setting_value = %s WHERE setting_name = %s”
val = (FeedString, “Grid_Supply”)
mycursor.execute(sql,val)
mydb.commit()
#client MQTT connectie maken met MQTT-brooker/server——————–
ourClient = mqtt.Client(“Zero_client_mqtt”) # Create a MQTT client object
ourClient.connect(“192.168.123.8”, 1883) # Connect to the test MQTT broker
ourClient.subscribe(‘domoticz/out/Power’) # Subscribe to the topic AC_unit
ourClient.on_message = messageFunction # Attach the messageFunction to subscription
ourClient.loop_start() # Start the MQTT client
It looks like the script is terminating.
Either use the loop_forever in place of loop_start or put an holding while loop at the end
while True:
time.sleep(1)
rgds
steve
In my subscribe code.py after a period of time
My code is getting exited with the result code 0
How to avoid these issue ? And i am using client. loopforever() but it doesn’t solve my issues any solution regarding that?
Can you post the code I will not publish it.
rgds
Steve
Hi Steve is there any solution for my subscribe code
Hi Steve,
Quick question: how can I run a command at every 5 seconds inside this client.loop_forever()?
I want my program to able to check something every 5 seconds, not only when a message is received.
Thx!
I wouldn’t use loop_forever just use loop_start and at the end of the script use
while true:
sleep(5)
check() #your code
Does that help
Rgds
Steve
Hey Steve!
Thanks for such a great tutorial on using the Paho MQTT client. I have an issue using the client.loop_start(). I will explain my use case first:
I am subscribing to a thingspeak channel with a keepalive of 10s (thingspeak disconnects clients after 15 seconds of inactivity). I call different functions depending on the data received from the subscribed channel. One of those functions takes around ~20 seconds to get executed (involves usage of actuators and sensors). Lastly, I need to publish a value back to a channel after execution. But the client gets disconnected meanwhile. I suspect the keepalive msgs aren’t sent when my function is executed. But this should not happen right as loop_start() runs on a different thread and is not affected by the main thread. Can you suggest a workaround for this? A possible approach is disconnecting and reconnecting after the function executes and publishing the data. But this isn’t desirable in my case as this is a real-time use case, and I need to publish ASAP after the function executes.
———————————————————–
My code looks like this:
….
def on_message(…):
if (): func1()
else: func2()
client.connect(broker,port,keepalive=10)
client.subscribe(sub_topic)
client.loop_start()
…
def func1(): # takes ~20 seconds to execute
def func2(): # takes ~5 seconds
——————————————————–
I really appreciate any help you can provide.
To confirm the keepalive is the problem increase it to 60secs. Why is it 10s anyway?
If it is then start both functions in another thread.
rgds
steve
Hi Steve, thank you so much for this.
I have a quick question: there is a Matlab mqtt code I need to rewrite in Python and there’s a line code that says:
while FU.connected (beginning of a while loop, where FU is a random client object)
how can I write this condition for a while loop in python?
Thanks you very much for all the work you do.
in my scripts I set a flag in the on_connect callback like so
client.connected_flag=True
and test it
while client.connected_flag:
#do something
Hope that helps
rgds
steve
Hi,
how can you add a topic after connect the client and start the loop?
Not sure what you mean by add a topic? Do you mean subscribe?
rgds
steve
Hi Steve, thanks a lot for these articles. They’ve all been very useful!
I was reading about the receive buffers in this article and I was wondering if those could be the reason why my paho-client displays a behaviour that seems weird to me.
I have a fairly simple setup with a sensor sending data to a broker in the format {timestamp=t0 ; measurement=v0}, and a paho-mqtt Client on a computer that receives, displays and stores the sensor data from the broker.
I noticed that a message I received had a timestamp that preceeded by a minute the time at which the Client’s on_message function was called, so I did some measurements to see if there was something wrong or if my Client was too slow to process the data, because this delay seemed fairly big.
I noticed that all the timestamps from the sensor preceed the timestamp at which the on_message function is called by a lot, sometimes even for up to 99 seconds! While a delay was expected, the weird behavior is that the time difference grows up to 99 seconds and then goes to about 0 and then starts to grow again. I was wondering if this could have something to do with the buffers used by the Client? Or if maybe the problem could be on the broker side since the sensor sends a lot of data really quickly, so maybe it’s the broker’s queue that creates this “oscillating delay”.
I’m attaching a plot of the measurements for better clarity: https://imgur.com/a/zt3l5xm
I call “timestamp of reception” the timestamp that I save when the on_message function is triggered and “timestamp of transmission” the timestamp stored inside the packet. Would it be possible to tell paho to save a timestamp as soon as a packet enters the receive buffer, or would that require to program the buffer handling myself?
Thanks in advance for any insight you might have on this issue.
Can you send me the code you are using?
rgds
steve
Hi Steve, thank you for these articles. Do you know where the correct place to call `client.loop_stop()` is? The Paho documentation is unclear about this. I’ve found that if I call it in `on_disconnect()` then subsequent attempts to explicitly reconnect the same client (with mosquitto) will fail (connection is accepted but no Broker session is initiated, everything seems to just stop). But if I call it just before calling `client.disconnect()` then there’s a race with any pending subscription/unsubscription/messages that might be in-flight at the time.
In your experience, where’s the best place to stop the network loop?
Hi
you call it before the main script terminates. I have lots of scripts that hold the script using a while loop
as below
while run_flag:
#code here
except KeyboardInterrupt:
print(“interrrupted by keyboard”)
time.sleep(2)
client.loop() #stop loop before quitting
Hi, Steve – thanks for the great tutorial!
I do have a question – I’m running some software I got from a 3rd party, that uses the Paho MQTT library to publish messages, but, it appears that it’s NOT calling a loop function.
It just calls connect, and then publish, to publish a bunch of topics. I have mosquitto running as a broker, and I can see the messages using mosquitto_sub.
I’m having an issue with the messages just STOPPING after some random period (or at least I haven’t yet been able to find the trigger). The code has the publish call inside a “try” block; but, it’s not throwing any exceptions, the messages just stop. While they’re stopped, I can use mosquitto_pub from the commandline and that message will go through.
I’m gonna go add a call to loop_forever and define the callbacks; but I wanted to see if you had any thoughts, and if this might be “expected” behavior.
Thanks!
Hi
I suspect you are publishing with a qos of 1 or 2. Strictly speaking the pub doesn’t need the loop to publish. But it does to process the puback message with qos 1 and 2 so I suspect it has them queued as they have been acknowledged and stops when the queue is full.
Rgds
steve
Hi Steve
We are using a Pyhton script as a Middleware in between a Twincat SPS and a cloud-based Platform.
From the Twincat SPS we are sending and receiving Data via OPC UA and sending it to the Platform with MQtt. We’ve managed to get this working quite well, but there is one problem we can’t solve.
When the connection to the OPC UA Server is lost or closed, we are getting an Error in the Program but the loop for the MQtt Client is still running. Is there a way to stop the loop as soon as an error in the main program occurs? The part of the Error where the Error occurs is in a while true loop.
TIA
Jannis
Hi
Does the python script exit? But the loop continues as you are seeing ping messages?
Rgds
Steve
The Python Script doesn’t exit as we are not getting an exit code. The Connection to the MQtt Broker happens bevor the error in the program occurs. Therefore, the Loop doesn’t stop, but the program does. It would be perfect if there would be an option to stop the loop when an error in the program occurs.
Can you use the ask steve page and send me the script.
steves-internet-guide.com/ask-steve/
I’m assuming you don’t want to reconnect on disconnect and then you will restart the script manually.
Rgds
Steve
Hi Steve, I want publish data to broker once connected. I would want to the data to be sent every second.
However, I would also want to stop publishing if connection with broker suddenly failed. Then I want attempt to reconnect indefinitely. Once reconnect success, I would like to continue publishing the data.
I understand that loop_forever() may be useful here, but how should I approach this ? How do I make publish every second ONLY if connected to broker?
It will be very helpful if you can explain the flow or pseudocode for this one. Thanks!
I usually set a flag in the on_connected callback and test it before publishing. In the latest version of the client there is an is_connected method that does the same so
if client.is_connected():
PUblish
does that make sense?
I have an issue with that. I tried shutting down my broker and the on_disconnect function does not seem to be called until the broker is restarted again.
And due to that, the connected_flag is always True until the broker is restarted, then False for awhile and then True again (presumably because reconnected successfully).
Therefore, I couldn’t publish based on the connected_flag.
I’ve used loop_start() and a while True block in my code.
P.S. Most of the code was modified based on your client_connect_status.py demo file.
http://www.steves-internet-guide.com/checking-active-mqtt-client-connections/
What could be the problem?
Hi
Can you use the ask-steve page and send me the code and I’;; try it
Hi, Steve!
I’m a beginner trying to learn MQTT. I’m so sorry if this is kind of a dumb question for you, but I was wondering what should I do to create pub-sub connection between 2 laptops? Should I run a script to subscribe in one laptop and publish in the other? Would the connection be established automatically if I use “mqtt.eclipse.org” as my broker, or should I specify something to make sure they can recognize each other?
Thank you for your helpful tutorial.
Yes each laptop would run a pub_sub client. The choice of broker isn’t important as long as they use the same one. Running your own local broker means that you have access to the broker console which is very useful for debugging and learning.
Rgds
Steve
Hi Steve,
i am using Mqtt for out IOT device , we have 2(A,B) servers, each one having the flask micro services,
from server A i am sending the data you can consider as a producer and from server B micro service i am receiving the data , after 1-2 days if we not restarted services B micro service is not receiving the data , if we restart the service A , then all the data is publishing at once because of qos=2
can you please let me know how to avoid this mqtt not responding state,
we are using the 2 flask service , when ever our initializes at the same time we are initializing the mqtt
def init_mqtt():
“””MQTT client connections.”””
try:
broker = config.MQTT_BROKER_IP
CLIENT.connect(broker, 1883, 60)
CLIENT.subscribe(‘operations_ack’, 2)
CLIENT.message_callback_add(“operations_ack”, delete_operations)
CLIENT.loop_start()
except Exception as err:
log.error(“Mqtt Error in init_mqtt-” + str(err))
__inti__.py
def create_app():
“”” to create and configure the flask application.”””
try:
app = Flask(__name__, instance_relative_config=True)
CORS(app)
# Initializing the Mqtt client
from .common import mqtt_client
mqtt_client.init_mqtt()
except Exception as err:
log.error(“error at app creation”)
Sorry
I’m not really familiar with flask. Not sure about the mqtt not responding state. It seems that from your description it works ok and then stops sending after a few days is that correct.
Did you check if the messages had been published and queued on the broker or if they are being queued on the sender?
rgds
steve
Hi Steve,
The messages are queued on the broker only, they are not received to subscriber side , but when we restart flask services all messages are sending at once, and i have written a call back for disconnect but it is not calling.
So the subscriber client is hanging? Not sure what you mean by written a callback for disconnect. When are you trying to disconnect? Don’t you want to receive all of the messages?
Like if connection is disconnected then the callback will call right with rc=1.
If it is not being triggered then I guess the connection is still open or at least the client and server think it is.
Do you see the ping requests?
I am doing my tasks in threads, and using client.loop_forever().
But when I close my GUI, the loop is still working, any way to stop it
You need to stop the threads before exiting
rgds
steve
I have been having a lot of trouble with thie loop_forever function.
I have a project like this:
def function():
“let a led blinking a bit”
def on_message(client, userdata, message):
if msg == 0:
function()
client.loop_forever()
The problem is while the function let a led blinking i can’t receive new messages. I would like to check in the function() if theres a new message so that i would now if it shoud run the “blink function” again or should turn for example the led off.
Are there any solutions.
Sorry for my English i’m german an i’m still learning.
Hi
If the function runs forever which I assume it does then you need to test State to decide on the action the function should take.
The on message callback would be used to set this state but you cannot call the function twice
So
def function():
global state
if state
“let a led blinking a bit”
Else
stop
def on_message(client, userdata, message):
global state
if msg == 1:
state=True
function()
if msg == 0:
state=False
function()
Does that help?
Rgds
Steve
Hi,
I appreciate your content and have created Python code that works perfectly based off of this and your other posts. However, I am running into a situation where MQTT is using 100% of CPU when I use loop_start() and virtually none when I use loop_forever(). Why would it be different? Visit the link below to see very basic sample code:
https://pastebin.com/CF0BynYY
A couple of observations:
1. The code as written pegs the CPU at 100%
2. If I comment out “loop_start()”, “While True” and “pass”, and uncomment “loop_forever()”, CPU utilization drops to about 2%.
Why is the CPU usage so different between these two? Am I doing something wrong? Is there a better way to do this that I somehow missed?
TIA!
It is the while True loop that is causing your problem.
try
while True:
time.sleep(1)
The loopforever() never gets executed while the while loop is in place.
Use loopforever if the main thread doesn’t do anything and all you do is wait for incoming messages.
However if you need to do something in hte main loop as well as receive new messages then use the loop_start() and hold the main thread using a while loop e.g.pseudo code
Initiialize callbacks
start loop
start while loop
code for main loop e.g
sleep 1 second
publish message
#break from loop under a condition could be a received message or when x messages sent
stop loop
exit
Hope it makes sense
stop loop
just a simply question.
when I receive the payload with my script on_connect; this is sometimes the same as the previous one. I understand that other clients would still need to be able this last message, therefore it is not deleted. But the thing I am not sure, how to deal with duplicates. do I need to take care of that myself?
Hi
You can get duplicates but it is not common especially on a test network. I would take another look at your script to see if there s a problem with it.
If you send it to me using the ask-steve page I’ll take a look
rgds
steve
Hello Steve,
thank you for your great work, really helped me understand everything around mqtt.
However one thing is still kind of blurry, how to keep client receiving data from brooker 24/7?
Do I Simply create python script which has got
…..
client.subscribe(“test/#”)
client.loop_start()
while True:
client.on_message=on_message
……
basically without end and run it by cron with @reboot prefix so it’s started immediatelly after machine reboot?
or how you and you guys actually work with py scripts ? as they simply won’t receive anything when they ended.
That’s pretty much last piece to my growing home-puzzle 😀
Thanks
Hi
Yes that is basically what you do. You could also use the loop_forever() which does the same thing.
Rgds
Steve
thanks 😉
is it good approach to have like publishing script and subscribing script separated? Am thinking about subscibing script will be basically looping 24/7 but publishing should run once per 2mins as it reads data on regular basis.
Using homie 3.0 structure i can declare $state offline for publishing and $state ready for subscribing -> action part
Or do you recommend everyting in one go?
Hi
If the publish relies on data it receives from the subscribe then it is easier to use a single script otherwise it makes no difference.
rgds
steve
hi steve! i have 2 clients connected in 1 broker but the thing is i want to have it to loop indefinitely.
client.loop_forever()
cart.loop_forever()
but it only shows one data of my esp
Hi
Once you call one loop_forever it basically stops there and so the second is never called.
Use
client1.loop_start()
client2.loop_start()
while True:
pass
Hi Steve,
Can you specify what happens if we call loop_start() before the connect() ?
Thanks!
Not really as it usually works but occasionally it doesn’t. Don’t know why. I just encountered the problem a few times and so I always do it that was now.It may be or have been a timing issue.
Sorry sue but I tried it 10 minutes ago and it worked ok. I came across the problem over 18 months ago and all I remember was that the script didn’t work as expected I think it didn’t call the callbacks.
I put it in the tutorial in case anyone else get the problem as it was difficult to troubleshoot and doesn’t really make sense.
Hi Steve,
thank you so much for all your tutorials! I find your blog more useful than anything else for working with mosquitto broker and paho clients, really.
You say that for multiple clients we need multiple connections thus multiple loops, but does this mean that if I want a client to connect to N brokers then I have to create N client objects? Or maybe there’s a way to create N connections for one client. In my case I have a client that is expected to subscribe/pubilsh to different topics on different brokers remaining connected to them.
Thank you again,
Yanina
Hi
1 client 1 connection. A client cannot be connected to several brokers at the same time.
However you don’t need multiple loops but you do need N client objects.
What I do is
create client object
add client reference to an array
Repeat
.
The I use a loop to work trough the clients and manually call client.loop() within that loop.
I do have some example code that I wrote a while ago that checks several brokers using that technique. If you contact me on the ask-steve page I’ll email it to you.
Rgds
Steve
How are you getting documentation on what exactly a loop does?
I’m trying to read more on the official PAHO MQTT PYTHON and it’s very light. I’m having trouble understanding what exactly loop_forever() does and how loop() work in general.
Does loop() or loop_forever() continually go through main, starting a new thread each time?
or does it only go through on_connect and on_message? I don’t know why the official documentation not mention the specifics regarding this anywhere 🙁
Thanks!
def main():
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(‘host.local’, 1883, 60)
# Connect to the MQTT server and process messages in a background thread.
client.loop_forever()
# Main loop to listening
print(‘Script is running, press Ctrl-C to quit…’)
if __name__ == ‘__main__’:
main()
Hi
I looked at the actual client code.
The loop simply starts a new thread. The job of this thread is simply to look at the send receive buffers and process them if there is data. If there is data it calls the callback function to alert your program.
Normally there is only one loop and hence 1 additional thread.
Here is the code for loop start
def loop_start(self):
“””This is part of the threaded client interface. Call this once to
start a new thread to process network traffic. This provides an
alternative to repeatedly calling loop() yourself.
“””
if self._thread is not None:
return MQTT_ERR_INVAL
self._thread_terminate = False
self._thread = threading.Thread(target=self._thread_main)
self._thread.daemon = True
self._thread.start()
You can see it starts a new thread and your main program continnues which is why you usually need a while loop in the main program to stop it ending.
However the loop_forever has a while loop which stops the main program from ending. This is why you need to put it at the end of the main program.
Once you call loop_forever the second thread will process the callbacks as before but nothing more is happening in the main thread.
Here are the first few lines of loop_forever
def loop_forever(self, timeout=1.0, max_packets=1, retry_first_connection=False):
run = True
while run:
if self._thread_terminate is True:
break
Does that make more sense?
Rgds
Steve
I’ve found that client.loop_stop() just waits forever. Fortunately in my program it’s acceptable to just terminate the process, otherwise I’d be concerned with why client.loop_stop() doesn’t work.
Hi
I’ve not experienced any problems with the loop_stop(). It maybe that it wasn’t called in the program or you had multiple loops.
If you still have problems use the ask steve page and send me the code and I’ll take a look
Hi Steve. Thanks very much for your great series on mqtt which has enabled me to get a nice start on my home automation using node-red. One issue I have as a newbee is that during the loop, paho seems to be catching and not reporting all Python run-time errors. It’s been a challenge to debug my code as things like calling the wrong function name or even a bad string constructor are hidden. Is there a solution for this? Thanks again.
Ken
Cam you use the ask steve page and if possible send me an example
rgds
steve
Helo Steve
I posted a message on one of the topics asking for trouble shooting advise re: connect – disconnect -connect ; would not connect. I forgot where I posted the orginal message.
I found the poblem and you allready had it documented on this page
~~~
client= mqtt.Client(cname)
client.loop_start()
client.connect(broker,port))
sometimes gives strange results.
******** do not do this you will not be able to do the second connect -***
make sure loop start happens after connect
~~~
and I made sure loop_stop() is called before calling disconnect()
Thanks for the web page very helpful
Mark
I use mqtt paho to connect to ttn in order to receive an upling of two bytes
It connect correctly and I receive correctly the first message and use loop_forever waiting for the next message scheduled 30sec later. However the script crashe imediatly after this first uplink.
Any suggestion why ?
thank in advance for your help,
jean-luc
—————————————————————————————————————————————
python 3.4 on raspberry jessie
——————————————–
code + crash message :
def on_connect(mqttc, mosq, obj,rc):
print(“Connected with result code:”+str(rc))
mqttc.subscribe(‘+/devices/#’) # subscribe for all devices of user
# gives message from device
def on_message(mqttc,obj,msg):
x = json.loads(msg.payload.decode(‘utf-8’))
device = x[“dev_id”]
print (“line23”,device)
payload_raw = x[“payload_raw”]
payload_plain = base64.b64decode(payload_raw)
print (“line26”,payload_plain)
datetime = x[“metadata”][“time”]
def on_publish(mosq, obj, mid):
print(“mid: ” + str(mid))
def on_subscribe(mosq, obj, mid, granted_qos):
print(“Subscribed: ” + str(mid) + ” ” + str(granted_qos))
def on_log(mqttc,obj,level,buf):
print(“message:” + str(buf))
print(“userdata:” + str(obj))
mqttc= mqtt.Client()
# Assign event callbacks
mqttc.on_connect=on_connect
mqttc.on_message=on_message
mqttc.username_pw_set(APPID, PSW)
mqttc.connect(“eu.thethings.network”,1883,60)
print (“line36”)
# and listen to server
run = True
while run:
mqttc.loop_start()
time.sleep(10)
mqttc.loop_stop()
#mqttc.loop_forever()
——————————————————————————
crash message :
Exception in thread Thread-62:
Traceback (most recent call last):
File “/usr/lib/python3.4/threading.py”, line 920, in _bootstrap_inner
self.run()
File “/usr/lib/python3.4/threading.py”, line 868, in run
self._target(*self._args, **self._kwargs)
File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 2650, in _thread_main
self.loop_forever(retry_first_connection=True)
File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 1481, in loop_forever
rc = self.loop(timeout, max_packets)
File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 1003, in loop
rc = self.loop_read(max_packets)
File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 1284, in loop_read
rc = self._packet_read()
File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 1849, in _packet_read
rc = self._packet_handle()
File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 2305, in _packet_handle
return self._handle_publish()
File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 2500, in _handle_publish
self._handle_on_message(message)
File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 2647, in _handle_on_message
self.on_message(self, self._userdata, message)
File “ttt3.py”, line 23, in on_message
device = x[“dev_id”]
TypeError: string indices must be integers
Hi
Did you say you received the firstt message OK?
Take the loop_start and stop outside the while loop and don’t use loop_forever as well as loop_start and see how it goes.
Use the ask-steve page to send me the python console output if it crashes.
rgds
steve
Hi Steve!
Thank you for this amazing tutorial!
I have a script that is able to receive MQTT events (I use on_message callback). My client is connected with connect_async method. The problem for me is that the thread responsible for handling this connection waits for one message to be proceeded to be able to get another message from the incoming message buffer. Is it possible somehow to start the message processing as soon as it gets to the buffer without waiting for the previous message to be handled completely? E.g. creating new thread for message handling?
Hi
Each message will activate the callback and you can only have one callback active for that client connection at the same time so that callback has to complete before it can be triggered again.
The message can be passed to the main thread using a queue and so it can be processed while the callbacks are active.
In many of my scripts the on_message callback drops the message in a queue and I unpack the queue in another thread. I can send you an example if you want.
Rgds
steve
Hi Steve,
Thanks for the light about threading and multiple clients, there is not that much tutos.
I have by the way some issues implementing it.
I have a broker on a gateway where the sensors send messages, and a cloud broker one where the gateway send final results. I have to subscribe to both broker topics.
There is no other way to thread (cloud base client, multiple sensors clients)? (as the cloud client use web sockets and sensors not)
I followed your tuto with the thread but I can’t see my messages, and it seems the client.connected_flag=True doesn’t seem to execute ..
Do you have an exemple to show for the base please ?
Here is what I’m using for the moment
def Connect(client, who):
if who == 0:
client.on_message = on_scp_message
client.on_connect = on_scp_connect
client.on_publish = on_scp_publish
client.on_subscribe = on_scp_subscribe
client.tls_set(ENDPOINT_CERTIFICATE)
client.username_pw_set(USERNAME, PASSWORD)
client.ws_set_options(ENDPOINT_URL_PATH)
client.connect(ENDPOINT, 443)
—– And some other settings if who = 1, without certificates etc …
for i in range(nclients):
cname = “Client-” + str(i)
if i == 0: client = mqtt.Client(DEVICE_ID, transport=’websockets’)
else: client = mqtt.Client(cname)
clients.append(client)
ex = futures.ThreadPoolExecutor(max_workers=10)
while True:
for i in range(len(clients)):
client.loop(0.01)
if client.connected_flag == False:
f = ex.submit(Connect, client, i)
Thanks a lot
Hi
With only two connections you might just want to start two client loops. However I will send you two scripts to your email address that use threading and futures. They are scripts for testing brokers that I’ve written and may be useful to you.
I’ll try to put together a tutorial to over multiple client connections in the next few weeks.
rgds
steve
Hi Steve,
I could manage to do multi client connexion thanks to your code you sent me! 🙂
thanks and cheers
well done
rgds
steve
You mentioned the default loop timeout is 1 second.
How to decrease that?
Thanks.
No not in loop_start().There is no need as the 1 second is the maximum time it blocks before returning. As it runs in its own thread it doesn’t make a difference.
If you call the loop manuallu using
client.loop() then you can pass in a timeout e.g
client.loop(.01) will only block for 1/100 of a second.