Receiving Messages with the Paho MQTT Python Client

Messages are received by the on_message callback, and so this callback must be defined and bound in the main script.

All callbacks rely on the client loop and this must also have been started using loop_start() or loop_forever(), or run manually within the main script. See Understanding the loop for more details

Provided this is the case it is easy to receive the messages and display them on the console using the example code below:

def on_message(client, userdata, message):
    print("received message =",str(message.payload.decode("utf-8")))

Message is an object and the payload property contains the message data which is binary data.

To access the message from you main script requires a little more work as the on_message callback is asynchronous and also usually run in a separate thread.

There are three common methods of doing this:

  1. Use a global variable
  2. Use a list
  3. Use the queue object

Using a Global Variable

You could use an on_message callback as show below:

def on_message(client, userdata, message):
global message_received
time.sleep(1)
message_received=str(message.payload.decode("utf-8"))

The message received variable is defined in the main script and can also be used there.

However how do you test that you have a message and what happens if you get another message before you can process the first message?

Use a List

We can declare a list in the main loop and use it in the callback . It doesn’t need to be a global variable.

The callback would look like this:

def on_message(client, userdata, message):

received_messages.append(message)#use an array

and in the main script we would have code like this

received_messages=[]
message=received_messages.pop()
print("received in array",str(message.payload.decode("utf-8")))

However this will potentially cause problems when the on_message callback is in another thread as they use a shared array.

Use the Queue object

This is by far the best option as the queue object is thread safe.

To use the queue object you must import it.

We use it very much like an array as shown in the code.

In the main loop we create a queue and extract messages from it using:

from queue import Queue

q=Queue()
#later in code
while not q.empty():
   message = q.get()
   if message is None:
       continue
   print("received from queue",str(message.payload.decode("utf-8")))

The on_message callback would look like this:

def on_message(client, userdata, message):
   q.put(message)

If you want to learn more about the queue object I recommend this pymow tutorial

This is the technique I use in my data logger scripts listed in the resources below.

The Message Payload

The actual message payload is a binary buffer. In order to decode this payload you need to know what type of data was sent.

If the payload was text data then it is decoded using the appropriate decoder (normally utf-8) as show below:

decoded_message=str(message.payload.decode("utf-8")))

If it is JSON formatted data then you decode it as a string and then decode the JSON string as follows:

decoded_message=str(message.payload.decode("utf-8")))
msg=json.loads(decoded_message)

If it is binary data e.g a file then it doesn’t need decoding.

Course Links

  1. Introduction to the Paho Python MQTT Client
  2. Introduction to the Client Class
  3. Connecting to a Broker
  4. Publishing Using The Paho Python MQTT Client
  5. Subscribing using The Paho Python Client
  6.  Receiving Messages with the Paho MQTT Python Client
  7. Understanding The Loop
  8. Understanding Callbacks
  9. Handling Multiple Client Connections
Please rate? And use Comments to let me know more

20 comments

  1. In the ‘Using a Global Variable’ example, there’s a 1 sec pause, hope it’s there for a reason.
    But what’s the reason? What happens if I remove the sleep(1) to speed it up a bit?
    Thanks

    1. It should be ok to remove it . It is probably there to show the messages in the correct order.
      I will need to revisit it as I always use a queue to pass messages and not a global.
      Rgds
      Steve

  2. I know we can check the message’s topic using message.topic, so what I do right now is to use multiple if else statements.
    Is there has other more efficent wat to do it?

    1. I assume that you are trying to find a particular topic. There is no switch in Python so if/else is probably the easiest way to do it.
      Rgds
      Steve

  3. Thank you for the tutorial.

    I am trying to use the queue-solution and there is one thing I do not understand:
    It seems that q has to be craeted outside all functions, so it is global. This seems to be necessary for the callbacks to know about q.
    Is there a way to implement queues without the need for a global q?

  4. I am using a queue it work OK but how can I handle this case:
    I put 10 message to queue and my script crash -> I lost 10 message

  5. Hi Steve,

    My IoT current sensor sends two payloads at every interval and the data needed to compute the second payload is in the first payload. I couldn’t figure out the logic to code it since I have to push each sensors data into my MySQL DB. Please help Steve. Thanks.

  6. There is a mistake in the JSON part. to actually convert the JSON string to python dictionary, you actually should use the json.loads(), instead of json.dumps().

  7. Hi Steve, I’m having a bit of a nightmare with my mqtt comms. I’m attempting to send a uint_8t* buffer of length 64000 to python from an ESP32. I receive the error ‘UnicodeDecodeError: ‘utf-8′ codec can’t decode byte 0xef in position 1: invalid continuation byte’. If I print the data without attempting to decode an exerpt of the received mqtt packet looks like this ‘b’Np\x00\x00\xa0\x0f\xff\xef\xfc\x00\xff\xf6\x00p\x0f\xff`\x0f\xffo\xf2\x00\xff\xf6\x00p\x00\x00\xa0\x0f\xff^\xcc\’ , so understandably can’t be decoded as utf-8. Do you have any clue what might be happening here? Losing my mind a bit….

    Thanks,
    Henry

    1. In the decode there is an ignore option. This is taken from one of my scripts
      def decodepage(data):
      code=”utf8″

      try:
      print(“trying to decode with”,code)
      wpage = data.decode(code,”ignore”)
      return(0,wpage) #success
      except:
      print(“Error with code “,code)

      code=”latin1”
      try:
      print(“trying to decode with”,code)
      wpage = data.decode(code,”ignore”)
      return(0,wpage) #success
      except:
      print(“Error with code “,code)
      return(-1,””) #return fail

      If you have no joy send the data to test.mosquitto.org on topic sig/test/cowan and let me know when you have done it using the ask steve page
      steves-internet-guide.com/ask-steve/

  8. I am subscribing to multiple topics which works, but the issue is that if any of the topics receive a new message it runs the entire loop which causes duplicates.

    I am storing Temp and humidity.
    I have used an analyser to make sure I am not posting duplicates.

    How can I get both values and the store to the database when the values update?

    1. Use the ask steve page and send me the code and I’ll take a look
      steves-internet-guide.com/ask-steve/

  9. I am trying to access a message via a global variable but when the program returns to the main script from the on_message function the message_received variable is not recognised. You say that the same variable should be defined in the main script but I am not sure what you mean. Please explain

    1. You declare the variable in the main script then in then on_message callback declare it again with the global prefix.
      e.g
      variableA=0 #main script
      global variableA #on_message callback
      Rgds
      Steve

Leave a Reply to steve Cancel reply

Your email address will not be published. Required fields are marked *