Paho Python MQTT Client Changes for MQTTv5 Support

The Paho Python client version 1.5.1 included support for MQTTv5. Because of the new capabilities of MQTTv5 there have been changes to many of the common functions like connect,subscribe and publish etc.

The aim of this tutorial is to point out the main changes, and what you need to do in your code to use the new features.

MQTT Protocol Version

If you want to use the new MQTTv5 capabilities then is important to specify the protocol version you want to use. Supported versions are:

  • MQTTv31 = 3
  • MQTTv311 = 4
  • MQTTv5 = 5

Therefore when creating the client for MQTTv5 use:

client = mqtt.Client("mqtt5_client",protocol=MQTTv5)

Connection Function

MQTT v3.1.1

connect(host, port=1883, keepalive=60, bind_address="")

MQTT v5

connect(self, host, port=1883, keepalive=60, bind_address="", bind_port=0,
clean_start=MQTT_CLEAN_START_FIRST_ONLY, properties=None)

The most important addition is the properties object.

The properties object is present in other function calls, and so it is important to understand how to use it and we cover this next.

Properties Object

The properties object varies according to what function you will use it in.

To use it in the connection function you first need to import the properties class and then create a properties object using the following syntax:

properties=Properties(PacketType)

The following are the available packet types (taken from the code:

CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
PINGREQ, PINGRESP, DISCONNECT, AUTH

In addition you will need to set property attributes. In the example code below we set the maximum packet size the client can receive.

Because this is done in the connection message we use the Connect packet type:

properties=Properties(PacketTypes.CONNECT)

Once we have the properties object we set the attributes as shown in the code below:

from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes 
properties=Properties(PacketTypes.CONNECT)
properties.MaximumPacketSize=20
client.connect(host,port,properties=properties)

To make your code work with both MQTTv5 and MQTTv3.1.1 you need to set the properties object to None when using MQTTv3.1.1

So Instead of

properties=Properties(PacketTypes.CONNECT)

you would use

properties=None

Note:  MQTTv5 Properties by Message Type has list of property fields that are available in the connection message and other message types.

Connect Acknowledge Callback Function

The server can return lots of information in the properties field of the CONNACK message and so the callback function must include it. Below shows CONNACK callbacks for v3.1.1 and v5 as defined in the latest client code..

Old signature for MQTT v3.1 and v3.1.1 is:
on_connect(client, userdata, flags, rc)

signature for MQTT v5.0 client:
on_connect(client, userdata, flags, reasonCode, properties=None)

Notice the extra properties field.

However existing code written for v3.1.1 used the following callback signature:

connect(client, userdata, flags, rc)

This will still work with MQTTv3.1.1 but you will get an error with MQTTv5

To make the callback work with v3.1.1 and v5 connections use:

on_connect(client, userdata, flags, rc, properties=None)

The following functions have also changed and you need to use the same logic as described above.

Subscribe and Subscribe Callback

MQTT v3.1.1 and v3.1

subscribe(topic, qos=0)

and for MQTT v5.0:

subscribe(self, topic, qos=0, options=None, properties=None)

Subscribe Callback

Old signature for MQTT v3.1.1 and v3.1 is:
on_ subscribe(client, userdata, mid, granted_qos)

and for the MQTT v5.0 client:
on_subscribe(client, userdata, mid, granted_qos, properties=None)

UnSubscribe Callback

Old signature for MQTT v3.1.1 and v3.1 is:
unsubscribe_callback(client, userdata, mid)

and for the MQTT v5.0 client:
unsubscribe_callback(client, userdata, mid)

Publish Message

Old signature for MQTT v3.1.1 and v3.1 is:
publish(self, topic, payload=None, qos=0, retain=False)
and for the MQTT v5.0 client:
publish(self, topic, payload=None, qos=0, retain=False, properties=None)

Summary

Changes in the MQTTv5 protocol mean that you will need to make script changes if you want your old code to work with both protocols.

Resources and Related Tutorials

 

Please rate? And use Comments to let me know more

9 comments

  1. Hi Steve,

    Due to some issues with python-client we had to move our subscriber client to golang.

    We could not able to set SessionExpiryInterval while using golang client as we don’t see any property referring to that.

    It will be really helpful if you could able to let us know if the expiry can be set via golang paho-mqtt library

      1. I had to look for an alternative for python client as while using python based subscribers giving me issues when the publishing message rate is high 5000msg/sec.
        At this message rate connections is being closed, could not able to trace to who closes the connection. Both broker logs/client logs saying the other end has closed the connection.

        Due to this pods/our subscriber clients are getting restarted couple of times ( as we have client_loopforever client is reconnecting). This restart introducing message delays and duplications as well.

        Thought changing the client might help us.

          1. I was using python paho MQTTV5 client so that I can have
            clean_session: false
            sessionExpiryInterval: 30sec, when K8s pod where my client is running scales down, session will be discarded by broker.

            With the above set-up, the only issue is connection drops, which resulting in pod-restarts and end-result being data being duplicated and there is a delay.

            Our publishers will be a large fleet of devices which will publish metrics to Mosquitto broker, and this python-client/s which has the shared-subscription across all topics will read the real-time data and stored to BigQuery.
            When I refer the delay, which means the actual recorded-time vs the time it gets recorded in to the db.
            With no restarts of the pods, this delay is around 30~40secs, which is acceptable. But due to restart its increasing to 2 to 3 mins, which does not give us realtime data on the DB, on which we will run our analysis.

            Deployed python-client as kubernetes deployment, it will scale up and down based on the CPU util

          2. Hi
            Not sure what is meant by
            sessionExpiryInterval: 30sec, when K8s pod where my client is running scales down, session will be discarded by broker.

            Why do you need to disconnect?
            Rgds
            Steve

  2. typo in subscribe_callback() for the MQTT v5.0 client:
    on_subscribe(client, userdata, mid, granted_qos, properties=None)

    should be:
    on_subscribe(client, userdata, mid, reasonCodes, properties)

    1. tks for that I just checked the code and it looks like
      on_subscribe(client, userdata, mid, granted_qos,reasonCodes properties=None)
      I will need to check further
      Rgds
      Steve

Leave a Reply

Your email address will not be published. Required fields are marked *