Paho Python MQTT Client – Understanding Callbacks

Callbacks are functions that are called in response to an event.

The events and callbacks for the Paho MQTT client are as follows:

  • Event Connection acknowledged Triggers the on_connect callback
  • Event Disconnection acknowledged Triggers the on_disconnect callback
  • Event Subscription acknowledged Triggers the  on_subscribe callback
  • Event Un-subscription acknowledged Triggers the  on_unsubscribe callback
  • Event Publish acknowledged Triggers the on_publish callback
  • Event Message Received Triggers the on_message callback
  • Event Log information available Triggers the on_log callback

The Paho client is designed to only use the callback functions if they exist, and doesn’t provide any default callback functions.

However; it does provide a mechanism to set them.

To use a callback you need to do two things

  1. Create the callback function
  2. Assign the function to the callback.

Callback Function Example

As an example we will use the on_connect callback.

First I create my function. I’ve called it on_connect but I could have called it anything I wanted.

Here is the function.

def on_connect(client, userdata, flags, rc):
     logging.info("Connected flags"+str(flags)+"result code "\
     +str(rc)+"client1_id ")
     client.connected_flag=True

The function simply logs a message, and sets a flag.

Now to associate my function with the On_connect callback I use the following:


client.on_connect= on_connect

Note: If my function had been called myfunction then I would use the following:


client.on_connect= myfunction

Callbacks and the Client Loop

Callbacks are dependent on the client loop as without the loop the callbacks aren’t triggered.

Below is the code of a simple python script that creates a connection and then waits in a loop for the on_connect callback to be triggered.

The on_connect callback then sets the flag that terminates the loop and the script ends.

This means that if the callback isn’t processed then the while loop never ends as we shall see later.

callback-on_connect-example

Here is the output generated by the script.

Notice how the loop executes waiting for the callback and stops once the callback gets processed.callback-on_connect-result-example

In the script above the client..loop_start() method starts a loop to process callbacks.

So let’s run the above script again, but this time I won’t start the loop. To do that I simply comment out the client.start_loop() line.

callback-loop-comment-out

Here is what happens when I run the script.

callback-loop stopped

You can see that I had to manually terminate the script as it never processed the callback, and the callback is what  terminates the while loop. You can download this script below:

download

How Callbacks Work

If you dig into the Client code you will find the code for the Connect acknowledge message.

When the client receives a CONNACK message the callback is triggered if it exists.

Here is the relevant section of the code. (around line 276)

client-callbacks

Notice the if on_connect statement. This is what checks for the callback, and if it exists it calls the callback- self.on_connect().

The self.on_connect function is the function we assigned earlier using.

client.on_connect= on_connect

You should see from the code that the function call passes several arguments, and so the function we create needs to be able to handle those arguments.

If you look back at the function I created you can see that it handles 4 arguments. The documentation has a description of those arguments

callback-connect

Asynchronous or Synchronous?

If you use the loop_start() or loop_forever functions then the loop runs in a separate thread, and it is the loop that processes the incoming and outgoing messages.

In this case the callbacks can occur any time in your script and are asynchronous.

However if you call the loop() function manually in the script then the callbacks are synchronous with your script as they can only occur when you call the loop() function.

See Understanding the Loop Function

Returning Values from Callbacks

Due to the way they operate It is not possible to return values from a callback using the standard return command.

Therefore if you need to get status information from a callback you need to use some form of global variable in the callback.

You have several options available:

  • Declare a global variable using the global statement
  • Use a list declared in the main function.
  • Use a dictionary declared in the main function.
  • Use an Object declared in the main function.

If you refer back to the on_connect call back example I used the connected_flag which is a property of the client object.

Because the client object is available throughout the script it can be accessed anywhere in the script.

So in the main script I use:

##
while not client1.connected_flag:
    print("waiting for connect")
    time.sleep(0.5)

Which waits for the connection to be established before proceeding.

To access received messages I tend to use a message queue which is thread safe, but you can also use a simple list.

To use a queue you need to import the module and create a queue
:

##
from queue import Queue
q=Queue()

then in the on_message callback you place the received message in the queue.
For a list you declare the list outside the on_message callback which means it is available in the callback, there is no need for global statement.

The following code shows how to use both methods in the on_message callback

##
def on_message(client1, userdata, message):
    #global messages
    m="message received  "  ,str(message.payload.decode("utf-8"))
    messages.append(m)#put messages in list
    q.put(m) #put messages on queue
    print("message received  ",m)

The following code show how to retrieve the messages using both methods in the main part of the script

##
def on_message(client1, userdata, message):
while len(messages)>0:
    print(messages.pop(0))
while not q.empty():
    message = q.get()
    print("queue: ",message)

If I run the example script you see the following:

python_callback_example

The demo script for processing messages in the callback is here:

download

Common Questions and Answers

Q- My callback is not being triggered?

A- Have you assigned the function to the Callback? Are you running a loop for that client?

Q-My callback returns a value but I don’t see it?

A- Because of the asynchronous nature of callbacks it doesn’t make sense to return a value from them.

Video- Python MQTT Client The Loop and Callbacks Explained

Course Links

  1. Introduction to the Paho Python MQTT Client
  2. Introduction to the Client Class
  3. Connecting to a Broker
  4. Publishing Using The Paho Python MQTT Client
  5. Subscribing using The Paho Python Client
  6.  Receiving Messages with the Paho MQTT Python Client
  7. Understanding The Loop
  8. Understanding Callbacks
  9. Handling Multiple Client Connections
Please rate? And use Comments to let me know more

41 comments

  1. Hi Steve! Great work as always! Does the on_message callback have the ability to store the messages that were received during connection interruption? Or there must be connection before it is called?

  2. Hi, thanks for the article! Is it mandatory to define the callback functions? What is I don’t defined them? Will get an output about the disconnected reason if I do not define the callback “on_disconnect()”?

    1. No callbacks are not mandatory but some are necessary depending on what the script does. If you don’t define the on_message callback then you wont receive any incoming messages.
      The on_disconnect callback is rarely used.
      You also need to have a client loop to process incoming messages.
      Rgds
      Steve

  3. Hi Steve,

    Amazing work you are doing.

    I had a concern regarding my code. I am trying to send the subscribed data to a CAN bus, for which I need to get the data out of the on_message function. I tried the following code:

    def on_message(client, userdata, message):

    m=str(message.payload.decode(“utf-8”))

    q.put(m)

    while not q.empty():

    results = q.get()

    print(results)

    Its not giving me any output though. I am receiving data from mqtt broker.

    1. Hi
      That is the code I use. You need to define q outside of the on_message callback.
      If you still have problems then use the ask-steve page and Then you can email me your code and I’ll take a quick look.
      Rgds
      Steve

  4. Is there a way to modify the on_message to take more parameters (by inheritance or similar) – specifically a callback. I am trying to use a function in on_message directly to process data in the message (to avoid delays) , but at the same time I would like to support different devices with different message payloads easily though a call back to a function that handles the instantiated devices data (without putting knowledge about the payload format in the on_message code )

    1. Hi
      Never tried it. What extra paramteres do you need? Can’t you call the function from the on_message callback as the device handler?
      Rgds
      Steve

  5. Hello,
    Thank you for brilliant material!
    I was wondering whether or not callbacks can be blocking each other.
    I’m using start_loop(), so that callbacks run in the background, processing all messages and subscriptions.
    Inside on_connect callback, I’m using subscription to some topics, but I’m also making sure that SUBACK was received via waiting for on_subscribe callback to change some values so that I know that this SUBSCRIBE message got responded to.
    The problem is, that I have to wait for the on_subscribe callback to be called while I’m in on_connect callback, and this is not working out for me. If I don’t wait for SUBACK, everything works, but if I introduce a loop inside on_connect, then I don’t receive SUBACK message at all.
    oSO the question is, are callbacks blocking each other from executing? Any way to to what was intended in the first place?
    Thanks

    1. Hi
      Don’t put the wait in the callback as the callback is only called in response to the CONNACK
      . you can subscribe there but wait for it in the main loop.

  6. Thanks for your informative post. It is very helpful!

    May I know how to pass some variables as the argument of the callback function?
    For instance, I want something like:
    ———————————————-
    def on_message(client, userdata, message, myVariable):
    ———————————————-

    I can only think of using the global variable inside the on_message. But is there a cleaner way?

    YH

    1. Hi
      The easiest way is to use the client object and you can simply extend it as I do using the flags connected_flag.
      Because it is an object you can use it anywhere in the script.
      Does that make sense?
      Rgds
      Steve

  7. Hi Steve, thank you for all you tutorial. When I subscribe to a topic, I print the message payload inside the on_message function, but I was wondering, if I can save the result inside a variable so I can use the message receive from outsite the on_message function… it could be very useful for one of my needs… Thank you

    1. Hi
      Yes just use a global variable or and object or a queue.
      I use a queue in most of my scripts.
      from queue import Queue
      q=Queue()

      in on message
      q.put(data)

      to read it use
      while not q.empty():
      results = q.get()

      Rgds
      Steve

      1. Hi, I get a value from esp8266 to raspberry under on_message function how to access that from outside the on_message function?? Please help college project…

        1. You need to use some form of global variable in the on_message callback. In python I use a list (array) in C you could use arrays but it is a while since I did any C programming.

  8. Hi Steve!
    What about to wait if on_connect callback return rc=0 (connection established) before subscribe/publish on topics?
    Is it necessary?
    Thank you! Your articles really helped me

    1. Hi Steve, is me again.
      According to https://www.eclipse.org/paho/clients/python/docs/

      “Subscribing in on_connect() means that if we lose the connection and reconnect then subscriptions will be renewed”

      I think its cause more impact if we subscribe a topic with qos=1, retain flag=True, Clean Session Flag=False, because when subscriptions are renewed, we receive again all messages from the broker, right?

      1. Hi
        When you re subscribe you will only receive retained messages and there is only one retained message per topic.
        Using a clean session of false would mean that you didn’t need to re-subscribe if the connection was lost and re-established.
        You don’t necessary need the qos =1.
        Using the qos=1 would mean that the broker would send any messages that have been sent to the topic while the client was reconnecting.
        rgds
        steve

    2. The on_connect callback doesn’t return so you would need a flag set tin the callback function.
      rgds
      steve

  9. hey,
    How can I access the data, send through the on_message() function, in the main body?
    ex-
    broker_ip=”192.168.43.201″
    s=[0]
    def on_connect(client,userdata,flags,rc):
    client.subscribe(“UR”)
    def on_message(client,userdata,msg):
    global s
    s.append(msg.payload)

    client=mqtt.Client()
    client.connect(broker_ip,1883,60)
    client.on_connect=on_connect
    client.on_message=on_message
    print (s)
    client.loop_forever()
    Here, I`m not able to append my sensor data values to the s list. Can you please help me. ??

      1. Thanks for the quick response.
        Sensor data came to the on_message function. I tried several methods to return those values. But I couldn’t. I will check your queue tutorial. Thank you very much.

  10. Hello, (Sorry for the translation, I’m French).

    Your site is very clear and I thank you for it.
    I use callback functions. It works very well when the (QOS=0 and clean_session=False).
    I wish when the client reconnects to be able to retrieve the last state in the different callback.
    The expected function is not great, when he reconnects he receives the last message that triggers a relay. The problem is that if my relay is “up”, it switches to “down” in the current instant and goes back to “up”. It’s as if it receives the same message all the time.
    The ideal operation would be that if the last message is “ON” then it stays on “ON” without interruption of half a second (relay slamming from ON to OFF then ON) every minute.

    Thank you for your feedback.
    This is part of my code :
    ##########################################

    def on_connect(mqttc, userdata, flags, rc):
    mqttc.subscribe(“Maison/Ext/Arrosage/#”,qos=2)

    ###################################################

    def on_message_rel1(mosq, obj, msg):
    Mqttrelai1 = msg.payload
    if Mqttrelai1 == “OFF”:
    GPIO.output(relai1, GPIO.HIGH)
    mqttc.publish(TOPIC11, “#Relai1 off”)
    elif Mqttrelai1 == “ON”:
    GPIO.output(relai1, GPIO.LOW)
    mqttc.publish(TOPIC11, “#Relai1 on”)
    else:
    print(“temp: “+msg.topic+” “+str(msg.qos)+” “+str(msg.payload))

    ############################################################

    def on_message_rel2(mosq, obj, msg):
    # This callback will only be called for messages with topics that match
    # $SYS/broker/messages/#
    #print(“temp: “+msg.topic+” “+str(msg.qos)+” “+str(msg.payload))
    Mqttrelai2 = msg.payload
    if Mqttrelai2 == “OFF”:
    GPIO.output(relai2, GPIO.HIGH)
    mqttc.publish(TOPIC22, “Relai1 off”)
    elif Mqttrelai2 == “ON”:
    GPIO.output(relai2, GPIO.LOW)
    mqttc.publish(TOPIC22, “Relai1 on”)

    print(“temp: “+msg.topic+” “+str(msg.qos)+” “+str(msg.payload))

    ############################################################

    mqttc = mqtt.Client(“Arrosage_potager”,clean_session=False, userdata=None)
    mqttc.message_callback_add(TOPIC1, on_message_rel1)
    mqttc.message_callback_add(TOPIC2, on_message_rel2)
    mqttc.message_callback_add(TOPIC3, on_message_rel3)
    mqttc.message_callback_add(TOPIC4, on_message_rel4)
    mqttc.message_callback_add(TOPIC5, on_message_relG)
    mqttc.message_callback_add(TOPICdead, on_message_dead)
    mqttc.on_disconnect = on_disconnect
    mqttc.on_message = on_message
    mqttc.on_connect = on_connect
    mqttc.username_pw_set(username=MQTT_USER , password=MQTT_PSWD)
    mqttc.connect_async(“192.168.1.99”, 1883, 60)
    mqttc.loop_start()

    while True:
    time.sleep(0.5)
    pingping(“192.168.1.99”)

    1. Hi
      Can I just confirm what you are trying to do.
      You have a device and the states is on which goes offline when it comes back online it retrieves the last message.
      No if the the last message was on you want to it stay on if it was off you want it to go off.
      Is that correct.

      1. That’s right, the underwriting works.
        Topic 1 = message ON
        Relay 1 = Subscribed to Topic 1 = ON
        so my relay one is on.

        Operation OK in QOS=0. No customer reconnection every minute.

        I wanted that when my object with a loss of power it can recover the last state of TOPIC 1, is I put QOS=2 as well as Clean_session=False.

        Operation not OK in QOS=2. The customer subscribes a first time on the TOPIC 1 is the relay 1 is on ON.
        Then at the end of a certain time (approximately 1 minute) a reconnection is made is a new subscription this product. this repeats itself unceasingly every minute.
        The reconnection causes relay=ON then OFF and immediately ON.

        I hope I am accurate.

        Thank you.

        Translated with http://www.DeepL.com/Translator (free version)

        1. Hi
          I sent you an email and guess you didn’t get it so here it is

          Still not quite sure of the setup.
          Is your rely a client subscribing to topic 1.
          How is the rely controlled?
          What is a customer reconnecting? Is that the rely reconnecting?
          Why QOS 2?
          A diagram would be helpful if you have one.
          Rgds
          Steve

  11. Hey. You’re videos were extremely helpful!
    I am trying to write a code so that two systems(Rpi to Rpi) can publish and subscribe to each other.
    Suppose if I enter ‘1’, one system should publish data to the other Rpi and vice versa.
    When published, the system should simply print the data.All this should happen simultaneously.

    Here are my codes:
    https://github.com/pareshkurdekar/Raspi_Wireless_Communication/blob/master/Rpi_to_Rpi_Server.py
    https://github.com/pareshkurdekar/Raspi_Wireless_Communication/blob/master/Rpi_to_Rpi_Client.py

    If you can help me out, I’d be very grateful.
    Thank you

  12. Hi Steve,
    I am having trouble with a problem that on_connect and on_disconnect.
    I am coding for the error cases of connect to HTTPS handling with TLS/SSL has been set, e.g.: Invalid certificates, revoke certificate on AWS, or deny the IoT connect on AWS. However I cannot get on_connect callback called in these error cases, instead of on_disconnect called immediately, observe the debug log of Paho MQTT, I didn’t see any CONNACK message received, dig into the Paho MQTT code, I found that PahoMQTT would call on_disconnect itself in _loop_rc_handle() while rc is above 0. Do you have any comments about this result? MY expected result should be call on_connect() with rc is not CONNACK_ACCEPTED for these error cases.
    In normal case (with valid certificates), my client is able to connect with IoT success.
    Any comments about this issue is appreciated. Thanks in advance.

    – Sean

    1. Does the client throw an error message when it fails to connect. Can you send me a screenshot of the connection attempt and I’ll take a quick look. Use the ask steve page

  13. Steve

    Really great tutorial. This makes a complex subject easy to understand. Could you expand on the difference between loop_start() and loop_forever()?

    Peter

  14. Hi Steve,
    Thank you for your last response!
    I have a problem understanding callbacks and loops.
    I just want to create a function, that returns the last sent message on a topic every time the function itself is activated. I need to send the information of actual speed of my EV3_0 to its follower EV3_1, so that the follower copies the speed or can work with it.

    e.g. for a code in the follower EV3_1

    EV3_0_ID = “192.168.15.148” #ID of the predecessor
    client = mqtt.Client(“EV3_1”)
    client.connect(EV3_0_ID) #connect to the predecessor
    while True:
    v_0 = get_speed() #needed function here to get the speed of EV3_0, that was sent last on that topic
    v_1 = v_0 #copy the speed of the predecessor
    follow_line(v_1) #function for the EV3 to follow a line with a given speed

    The get_speed() function should work as fast as possible and do not have time to wait for any loops, because the following follow_line(v) function must be called with high frequency for a stable behavior.

    If I try it with the explained functions (on_message, on_connect, start_loop, stop_loop)on your site, the follow_line() function works too slow. So what should be inside of the get_speed() function?

    I hope you can help me with that. I think the needed code is simple but everything I try, is not working as it should work.

    Kind regards
    Artur

  15. Thank you very much for this great article, please how I can stop the reconnect loop, I’m processing the callback, but I want if the connection is broken I don’t wanna the connection to be established another time so if the connection is broken then the script will throw an error, how I can do this and thank you in advance

    1. Hi
      I’m assuming that you have used the loop_start() to start the loop.
      If so use the on_disconnect callback to detect the disconnect and in the callback use loop_stop() to stop the loop.
      Stopping the loop stops re-connection attempts.
      Let me know how you get on.
      Rgds
      Steve

  16. Hello Steve,

    is there any difference between callback and callback function?does callback is the one on the left (client.on_connect) and the callback function is one we wrote?
    also a question probably related to programming. why don’t we have parenthesis on either side of functions. e.g
    client.on_connect= on_connect

    i believe on_connect on the left side and on the right side both are functions. so how is the flow here, i means from right side normally we do assignment.
    sorry for very basic question but would help me to understand the programming a bit

Leave a Reply

Your email address will not be published. Required fields are marked *