Paho Python MQTT Client-Understanding The Loop

When writing code using the Paho Python client you would have had to use the loop() function .

In this tutorial we will look at what it does, and why it is necessary.

When new messages arrive at the Python MQTT client they are placed in a receive buffer.

The messages sit in this receive buffer waiting to be read by the client program.

loop-function-illustration





You could program the client to manually read the receive buffers but this would be tedious.

The loop() function is a built in function that will read the receive and send buffers, and process any messages it finds.

On the receive side it looks at the messages, and depending on the message type, it will trigger the appropriate callback function. See Understanding callbacks.

For example if it sees a CONNACK message it triggers the on_connect() callback.

Now instead of manually reading the receive buffer you just need to process the callbacks.

Outgoing messages and message acknowledgements are placed in the send buffer.

The loop function will read this buffer and send any messages it finds.

Calling the Loop Function

See network loop in docs for function reference.

The Paho Python client provides three methods:

  • loop_start()
  • loop_forever() and
  • loop().

The loop_start() starts a new thread, that calls the loop method at regular intervals for you. It also handles re-connects automatically.

To stop the loop use the loop_stop() method.

The loop_forever()  method blocks the program, and is useful when the program must run indefinitely.

The loop_forever() function also handles automatic reconnects.

The loop can be stopped by calling loop.stop().

You should stop the loop before you exit the script.

You can also manually call the loop() method in your program.

If you do this you must remember to call it regularly.

That is it must be in a loop. e.g pseudo code below:

while..

Some code

client.loop(.1) #blocks for 100ms

some code

Because the loop is a blocking function I call it with a timeout the default timeout is 1 second.

If you call the loop manually then you will need to create code to handle reconnects.

Important! If your client script has more than one client connection then you must call or start a loop for each client connection.

For example, if I create two clients client 1 and client2 in a script, then you would expect to see client1.loop() and client2.loop() in the script.

Implementation Note:

I have experienced strange behaviour when starting a loop before creating a connection . So

client= mqtt.Client(cname)
client.connect(broker,port))
client.loop_start()

Works Ok but

client= mqtt.Client(cname)
client.loop_start()
client.connect(broker,port))

sometimes gives strange results.

Stopping the loop automatically

If you are using the loop_start() function then you will probably need to stop the loop automatically if the connection fails.

The easiest way of doing this is using the on_disconnect callback.


def on_disconnect(client, userdata,rc=0):
    logging.debug("DisConnected result code "+str(rc))
    client.loop_stop()

However you should only stop the loop if you are completely finished, and are going to exit.

Stopping the loop will stop auto reconnects unless you take steps to call the loop manually.

Handling Multiple Clients

If your script connects using multiple clients then each client will need a loop.

Therefore if you are using the loop_start() method then you will need to call it for each client connection.

Therefore if you have 2 client connections you will need two loops which equals two additional threads.

For 2 clients it isn’t really a problem, but what if you have several hundred client connections then you will need several hundred additional threads.

In this situation it is better to manually call the loop for each client and use thread pooling.

I will cover this is another tutorial at a later date.

However for up to around 20 client connections then is is easier to use the inbuilt loop_start and stop functions.

To make it simpler add the clients to a list and loop through the list to start the loops and the same to stop e.g

import paho.mqtt.client as mqtt
clients=[]
nclients=20
mqtt.Client.connected_flag=False
#create clients
for i  in range(nclients):
   cname="Client"+str(i)
   client= mqtt.Client(cname)
   clients.append(client)
for client in clients:
  client.connect(broker)
  client.loop_start()

Common Questions and Answers

Q- My callbacks aren’t being called. Why is that?

A- You need to start a loop or call the loop() function to process callbacks.

Q- My script has ended but the loop still appears to be running?

A- It probably is. You need to stop the loop before exiting the script.

Q- Why call the loop manually when you can use loop_start()?

A-The main reason is when the script has lots of client connections (>20) as if you use loop_start() then you need a loop for each client.

Summary

The built in loop functions will process the receive and send buffers to send and receive messages and message acknowledgements.

To make life easier use the loop_start and stop functions unless you have 100’s of client connections.

Resources and related Tutorials

Facebooktwittergoogle_plusredditpinterestlinkedinmail

30 comments

  1. Helo Steve
    I posted a message on one of the topics asking for trouble shooting advise re: connect – disconnect -connect ; would not connect. I forgot where I posted the orginal message.

    I found the poblem and you allready had it documented on this page
    ~~~
    client= mqtt.Client(cname)
    client.loop_start()
    client.connect(broker,port))
    sometimes gives strange results.
    ******** do not do this you will not be able to do the second connect -***
    make sure loop start happens after connect
    ~~~
    and I made sure loop_stop() is called before calling disconnect()
    Thanks for the web page very helpful
    Mark

  2. I use mqtt paho to connect to ttn in order to receive an upling of two bytes
    It connect correctly and I receive correctly the first message and use loop_forever waiting for the next message scheduled 30sec later. However the script crashe imediatly after this first uplink.
    Any suggestion why ?
    thank in advance for your help,

    jean-luc
    —————————————————————————————————————————————
    python 3.4 on raspberry jessie
    ——————————————–

    code + crash message :
    def on_connect(mqttc, mosq, obj,rc):
    print(“Connected with result code:”+str(rc))
    mqttc.subscribe(‘+/devices/#’) # subscribe for all devices of user

    # gives message from device
    def on_message(mqttc,obj,msg):
    x = json.loads(msg.payload.decode(‘utf-8’))
    device = x[“dev_id”]
    print (“line23”,device)
    payload_raw = x[“payload_raw”]
    payload_plain = base64.b64decode(payload_raw)
    print (“line26”,payload_plain)
    datetime = x[“metadata”][“time”]

    def on_publish(mosq, obj, mid):
    print(“mid: ” + str(mid))

    def on_subscribe(mosq, obj, mid, granted_qos):
    print(“Subscribed: ” + str(mid) + ” ” + str(granted_qos))

    def on_log(mqttc,obj,level,buf):
    print(“message:” + str(buf))
    print(“userdata:” + str(obj))

    mqttc= mqtt.Client()
    # Assign event callbacks
    mqttc.on_connect=on_connect
    mqttc.on_message=on_message

    mqttc.username_pw_set(APPID, PSW)
    mqttc.connect(“eu.thethings.network”,1883,60)
    print (“line36”)
    # and listen to server
    run = True
    while run:
    mqttc.loop_start()
    time.sleep(10)
    mqttc.loop_stop()
    #mqttc.loop_forever()
    ——————————————————————————
    crash message :

    Exception in thread Thread-62:
    Traceback (most recent call last):
    File “/usr/lib/python3.4/threading.py”, line 920, in _bootstrap_inner
    self.run()
    File “/usr/lib/python3.4/threading.py”, line 868, in run
    self._target(*self._args, **self._kwargs)
    File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 2650, in _thread_main
    self.loop_forever(retry_first_connection=True)
    File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 1481, in loop_forever
    rc = self.loop(timeout, max_packets)
    File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 1003, in loop
    rc = self.loop_read(max_packets)
    File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 1284, in loop_read
    rc = self._packet_read()
    File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 1849, in _packet_read
    rc = self._packet_handle()
    File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 2305, in _packet_handle
    return self._handle_publish()
    File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 2500, in _handle_publish
    self._handle_on_message(message)
    File “/usr/local/lib/python3.4/dist-packages/paho/mqtt/client.py”, line 2647, in _handle_on_message
    self.on_message(self, self._userdata, message)
    File “ttt3.py”, line 23, in on_message
    device = x[“dev_id”]
    TypeError: string indices must be integers

    1. Hi
      Did you say you received the firstt message OK?
      Take the loop_start and stop outside the while loop and don’t use loop_forever as well as loop_start and see how it goes.
      Use the ask-steve page to send me the python console output if it crashes.
      rgds
      steve

  3. Hi Steve!
    Thank you for this amazing tutorial!
    I have a script that is able to receive MQTT events (I use on_message callback). My client is connected with connect_async method. The problem for me is that the thread responsible for handling this connection waits for one message to be proceeded to be able to get another message from the incoming message buffer. Is it possible somehow to start the message processing as soon as it gets to the buffer without waiting for the previous message to be handled completely? E.g. creating new thread for message handling?

    1. Hi
      Each message will activate the callback and you can only have one callback active for that client connection at the same time so that callback has to complete before it can be triggered again.
      The message can be passed to the main thread using a queue and so it can be processed while the callbacks are active.
      In many of my scripts the on_message callback drops the message in a queue and I unpack the queue in another thread. I can send you an example if you want.
      Rgds
      steve

  4. Hi Steve,

    Thanks for the light about threading and multiple clients, there is not that much tutos.
    I have by the way some issues implementing it.

    I have a broker on a gateway where the sensors send messages, and a cloud broker one where the gateway send final results. I have to subscribe to both broker topics.

    There is no other way to thread (cloud base client, multiple sensors clients)? (as the cloud client use web sockets and sensors not)

    I followed your tuto with the thread but I can’t see my messages, and it seems the client.connected_flag=True doesn’t seem to execute ..

    Do you have an exemple to show for the base please ?

    Here is what I’m using for the moment

    def Connect(client, who):
    if who == 0:
    client.on_message = on_scp_message
    client.on_connect = on_scp_connect
    client.on_publish = on_scp_publish
    client.on_subscribe = on_scp_subscribe
    client.tls_set(ENDPOINT_CERTIFICATE)
    client.username_pw_set(USERNAME, PASSWORD)
    client.ws_set_options(ENDPOINT_URL_PATH)
    client.connect(ENDPOINT, 443)
    —– And some other settings if who = 1, without certificates etc …

    for i in range(nclients):
    cname = “Client-” + str(i)
    if i == 0: client = mqtt.Client(DEVICE_ID, transport=’websockets’)
    else: client = mqtt.Client(cname)
    clients.append(client)
    ex = futures.ThreadPoolExecutor(max_workers=10)
    while True:
    for i in range(len(clients)):
    client.loop(0.01)
    if client.connected_flag == False:
    f = ex.submit(Connect, client, i)

    Thanks a lot

    1. Hi
      With only two connections you might just want to start two client loops. However I will send you two scripts to your email address that use threading and futures. They are scripts for testing brokers that I’ve written and may be useful to you.
      I’ll try to put together a tutorial to over multiple client connections in the next few weeks.
      rgds
      steve

      1. Hi Steve,

        I could manage to do multi client connexion thanks to your code you sent me! 🙂

        thanks and cheers

    1. No not in loop_start().There is no need as the 1 second is the maximum time it blocks before returning. As it runs in its own thread it doesn’t make a difference.
      If you call the loop manuallu using
      client.loop() then you can pass in a timeout e.g
      client.loop(.01) will only block for 1/100 of a second.

  5. for each client object in the clients array do i have to associate the callbacks as part of the for loop?

    “`
    for i in range(nclients):
    cname=”client”+str(i)
    mqttc=mqtt.Client(cname)
    mqttc.on_message = cb_message
    mqttc.on_connect = cb_connect

    “`

    also why do you not choose to use loop_start

    1. The array stores the client object and the client object is already associated with the callback as your code is effectively:
      create client object
      attach client to callback
      store client object in array
      create next client
      attach client to callback
      store client object in array
      etc

    1. Yes in the on_message callback check that the payload length isn’t zero if it is don’t try to decode it.
      rgds
      steve

  6. Hello Steven! Hope that everything is fine.
    I read above posts and i deal with a similar problem. When i receive a message without payload from my node (obviously make effort to reconnect) i get a traceback as:
    File “”, line 127, in
    client.loop_forever()
    File “C:\Users\Nikolaos\Anaconda3\lib\site-packages\paho\mqtt\client.py”, line 1481, in loop_forever
    rc = self.loop(timeout, max_packets)
    File “C:\Users\Nikolaos\Anaconda3\lib\site-packages\paho\mqtt\client.py”, line 1003, in loop
    rc = self.loop_read(max_packets)
    File “C:\Users\Nikolaos\Anaconda3\lib\site-packages\paho\mqtt\client.py”, line 1284, in loop_read
    rc = self._packet_read()
    File “C:\Users\Nikolaos\Anaconda3\lib\site-packages\paho\mqtt\client.py”, line 1849, in _packet_read
    rc = self._packet_handle()
    File “C:\Users\Nikolaos\Anaconda3\lib\site-packages\paho\mqtt\client.py”, line 2305, in _packet_handle
    return self._handle_publish()
    File “C:\Users\Nikolaos\Anaconda3\lib\site-packages\paho\mqtt\client.py”, line 2500, in _handle_publish
    self._handle_on_message(message)
    File “C:\Users\Nikolaos\Anaconda3\lib\site-packages\paho\mqtt\client.py”, line 2647, in _handle_on_message
    self.on_message(self, self._userdata, message)
    File “”, line 53, in on_message
    enc=json.dumps(a[‘data’])

    My scope is to reconnect, after possible disconnections. My email is nikospps@hotmail.com and the code which i use:
    def on_connect(client, userdata, flags, rc):
    print(“Connected with result code “+str(rc))
    client.subscribe(“application/1/node/#”)#Subscription to all nodes

    def on_message(client, userdata, msg):
    print(msg.payload.decode())
    a=json.loads(msg.payload.decode())
    enc=json.dumps(a[‘data’])
    ….more data in this function….

    client = mqtt.Client()
    #pclient = mqtt.Client()
    #pclient.username_pw_set(‘xxxxIU0fw0CmcLnbxxxx’, password=None)
    client.connect(“192.168.1.18”,1883,60)
    #pclient.connect(“x.x.x.x”,1883,60)

    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    client.loop_forever()

    Thanks in advance,
    Nikos.

    1. A quick guess would be that the json module doesn’t like an empty string.
      Try testing for an empty payload before decoding.Let me know if that works.
      rgds
      steve

  7. Hey Steve,
    I found this very interesting.
    However, I need to understand one more concept for published data.
    I have a publisher sending some data at given interval of time lets say 3 seconds once. But on the subscriber, I have a loop based on the published data to be run. If my subscriber takes 5 seconds to execute this loop, I am unable to process the second published message as it comes in and executes only once the previous loop completes.
    Here is the snippet of what I mean:
    def on_message(mosq, obj, msg):
    if msg.payload == “Stop”:
    mqttc.disconnect()
    sys.exit()
    elif msg.payload[0] == “0”:
    mqtt_test_wbs.main(msg)
    Now, If my concepts are not wrong, once the message is being passed into the function call, it should return to the main where I call the on_message and look for next incoming message. This is not happening. And I am using loop_forever() for continuous connectivity.
    Please help
    Thanks in advance

  8. So if I want my loop to run forever, but I want to run other functionality at the same time (in another thread), I can just use loop_start() and let it handle the reconnects and never stop it?

    1. Yes.You connect the client and then start the loop and you can then carry on and do other things.However you should stop the loop before you exit the script. You normally have something like this
      connect client
      start loop
      do main code usually in a loop
      stop loop
      exit

      1. Why is it necessary to stop the loop before exiting the script? Exiting the script means the client is not more active right? And it shouldn’t really matter then, given the client itself doesn’t exist after exiting the script.

        1. Because the loop will still be active even when the script ends as it runs in a different thread. When you close the command console or IDE console it will be stopped.

  9. Can I bind the callback after connection?

    #broker.py
    class Broker(object)
    def getBroker(self):
    client = mqtt.Client() #create new instance
    client.connect(broker_address) #connect to broker
    return client

    #main.py
    broker = Broker()
    client = broker.getBroker()
    client.on_connect= myCallBackFunction() #bind call back function
    client.loop_start() #Start loop

      1. Okay thanks a lot. So the loop_forever() should always be started after attaching all the interested callbacks? I have added the loop_forever() at the very end of the code, so that i dont miss processing any callbacks.

        #broker.py
        class Broker(object)
        def getBroker(self):
        client = mqtt.Client() #create new instance
        client.connect(broker_address) #connect to broker
        return client

        #main.py
        broker = Broker()
        client = broker.getBroker()
        #bind call back function
        client.on_connect= myCallBackFunction()
        client.on_disconnect = myCallBack_on_disconnect
        client.on_subscribe = myCallBack_on_subscribe
        client.on_unsubscribe = myCallBack_on_unsubscribe
        client.on_message = StoreToDatabase()
        #call the loop at the end
        client.loop_forever() #Start loop

  10. I have been having a lot of trouble with this on a recent project. This helps explain the expected behavior, but it really doesn’t seem to work this way in my set up.

    I’m connecting to a device that uses TLS encyption. I set this up and connect just fine, using loop_start(). The client will occasionally disconnect (randomly), and I catch tis in the on_disconnect() callback, and I do call loop_stop() there (as you suggest).

    The problem is that the client is not automatically reconnected as you say should happen – and if it did – where would you call loop_start() again? the on_connect() callback would not process the CONACK as the loop is not running.

    What I’m doing is ctching the rc value in on_disconnect, and if it’s not 0, calling connect again. In my case, client.reconnect() does not work (but has to be called for some reason), then when that fails, calling loop_stop() again, and setting the client up from scratch (including TLS encryption) then calling client.connect() again works.

    It’s a kludge, and not as simple as you imply it should be. I feel like I’m missing something.

    Any explanation for the way this is working?

    1. Nick
      When you stop the loop then you stop auto reconnects so don’t stop it in the disconnect callback until you are completely finished.

      That is you don’t want the client to try to reconnect.

      I use a flag in the on_disconnect and on_connect callbacks that I toggle on and off when I have/don’t have a connection

      def on_disconnect(client, userdata, flags, rc=0):
      print(“DisConnected flags”+”result code “+str(rc)+”client_id “)
      client.connected_flag=False #flag

      When the flag goes False I stop and wait for the connection to be made again.
      while run_flag:
      print(“in main loop”)
      if client.connected_flag: #publish if have a connection
      client.publish(“house/1″,”test message”)
      time.sleep(3)

      1. Thanks, I feel daft now, I figured that out. removing the loop_stop() in on_disconnect() resolved all my connection/reconnection issues.

        Thanks.

Leave a Reply

Your email address will not be published. Required fields are marked *