Handling Multiple Client Connections-Python MQTT

It is common to require several connections either to a single broker or to several brokers.

There a several ways of achieving this and we will examine two ways.

First let us recap on what we require with a single connection.



  • unique client_id
  • client loop to process callbacks
  • broker address and port.

If we have multiple client connections then we normally need a:

  • unique client_id for each connection
  • A client loop for each connection
  • Broker address and port details for each connection.

Multiple Clients and Threads loop() and loop_start()

On a Single client scenario we can send and receive using a single thread if we use the client.loop() function instead of the loop_start and loop_stop functions.

However this isn’t normally done as it is easier to use the loop_start() and Loop_stop() as they handle reconnects automatically, whereas the loop() function doesn’t and so you need to do that yourself.

If we use the loop_start() function for a single client then we have two threads.

Having two threads as opposed to one isn’t a problem but if you want to implement several thousand clients then it is.

However for 10 or twenty clients then it is probably OK

Threading Advantages

The main advantage of using threads is speed. If your script needs to be very responsive then your will probably need threads.

Tutorial Overview

I have tried various methods of implementing multiple clients and they vary in complexity.

The main problem with multiple clients is:

  • Handling of the client Connections and reconnects
  • Handling of the callbacks

We will cover:

  • Multiple clients Using Loop_start() and loop_stop() functions.
  • Multiple clients Using Loop() function
  • Multiple Clients using Threads.

Method 1 – Multiple Client Loops.-Loop_start()

For multiple client connections then each client will need a loop.

Therefore if you have 2 client connections you will need two loops which equals two additional threads.

For 20 client connections you will need twenty loops which equals twenty additional threads.

The code below show how we create unique client_ids and multiple loops for multiple clients,

import paho.mqtt.client as mqtt
clients=[]
nclients=20
mqtt.Client.connected_flag=False
#create clients
for i  in range(nclients):
   cname="Client"+str(i)
   client= mqtt.Client(cname)
   clients.append(client)
for client in clients:
  client.connect(broker)
  client.loop_start()

Code explanation
1. We use an array to store the client connections.
2. The first for loop create a unique client id and uses that id to create a new connection.
3. It then stores the client connection reference in an array.
4. The second for loop goes through the connection array and uses the client object to create the client loop.
5. We now have 20 loops active and 20 additional threads. (nclients=20).

How do we publish and/or subscribe?

We just get client object from the array

for client in clients: 
  client.connect(broker)
  client.subscribe(topic)
  client.publish(topic,message)

Alternatively we can subscribe in the on_connect callback.

def on_connect(client, userdata, flags, rc):

    if rc==0:
        client.connected_flag=True
        client.subscribe(topic)

Note: You don’t need separate callbacks for each client as the callback contains the client object

Receiving Messages
Uses the standard on_message callback

def on_message(client, userdata, message):
   print("message received",str(message.payload.decode("utf-8")))

Notes:
The approach works well if all of the clients connect to the same broker, and do the same thing.

It is easy to modify the script to publish and subscribe to different auto generated topics using the same technique as used in creating the unique client id.

However using different brokers isn’t so simple.

Uses: I’ve used this style for creating stress test scripts where you need lots of clients that publish and subscribe.

Demo script: The demo script demo1.py connects and publishes and subscribes on various topics.

Received messages are displayed along with the thread count.

Example :

 Modification for Multiple Brokers, and Topics

In this alternative we use a Python dictionary to store the client details connection,subscribe and publish details which gives use more flexibility.

However the price we pay is increased initial configuration.

Code:

client1={broker1:192.168.1.159,port:1883,name:”blank”,sub_topic:”test1″,pub_topic:”test1″,client=””}

For multiple clients we add them to an array or dictionary.

clients=[
{"broker":"192.168.1.159","port":1883,"name":"blank","sub_topic":"test1","pub_topic":"test1"},
{"broker":"192.168.1.65","port":1883,"name":"blank","sub_topic":"test2","pub_topic":"test2"}
]

Notes: I’ve left the client_id as blank as it is better to create a random name using techniques like this:

t=int(time.time()) #remove decimal part
t=str(t) #concert to string
client_id=client1+t
and
client_id=client2+t
etc

we add it to the dictionary using:

clients[0]["name"]=client_id

You could also add the client object using:

client=mqtt.Client(client_id)
clients[0]["client"]=client

In that way you have references between the client object,broker,topics etc.

Demo script: The demo script demo1a.py uses an array and dictionary to store client data and is suitable when dealing with multiple brokers and topics. Otherwise it does the same as demo1.py

Method 2 – Using The loop() function

If we use the loop() function we can reduce the number of threads.

The demo script demo1b.py is the same as demo1a.py but uses loop() and not loop_start().

{outline]You should note it does not handle reconnects.[/outline]

You should also note and test that the main while loop is a bad example when using the loop() call as the while loop has lots or sleep calls.

The loop() function needs to be called often and so it should be in a while loop with very little delay!

To counter this it is better to use a separate thread to call the while loop.

Here is the code:

def multi_loop(nclients):
   global run_flag 
   while run_flag: 
      for i in range(nclients):
         client=clients[i]["client"]
         client.loop(0.01)

To start in a new thread use:

t = threading.Thread(target=multi_loop,args=(nclients,True))#start multi loop
t.start()

Method 3 – Using Threads for Each Client

If you need each client to be very responsive then you will need to start a thread for each client

The script demo2.py uses a different thread for each client connection.

The thread calls the client loop() function manually and there is no need for loop_start().

The script handles reconnects and will attempt 3 before stopping that client thread.

Total threads =number of clients+1 .

Note: If started using IDE then add another thread.

The clients do nothing at present to make them do something edit the pub function.

You can create a different pub function for each client if needed but you will need to modify how the threads are created if you do.

download

Course Links

  1. Introduction to the Paho Python MQTT Client
  2. Introduction to the Client Class
  3. Connecting to a Broker
  4. Publishing Using The Paho Python MQTT Client
  5. Subscribing using The Paho Python Client
  6. Understanding The Loop
  7. Understanding Callbacks
  8. Handling Multiple Client Connections
Please rate? And use Comments to let me know more
[Total: 5   Average: 5/5]

19 comments

  1. Hi Steve,

    Is it possible we have one client which connects to two different broker (one is local and other is public)?

  2. Hello Steve,

    For Performance test my devices (50K) needs to be in connected state for 24 Hours and all the devices should be subscribed to a topic.

    The application will send the message to these devices in the topic subscribed. My latency calculation will be from the time of Application published the messages to Getting the message in the devices.Is it possible to achieve this scenario?

    Thanks,
    Jega

    1. I have a test client that creates multiple connections. I got it to 1024 but the broker stopped accepting after that. It is a unix configuration issue to do with file handles that you need to change a setting but I never did and so I never tested beyond that.
      However the client code was running on windows and didn’t complain.
      You can obviously use multiple clients to achieve this level if needed.
      Rgds
      Steve

  3. Hello Steve,

    I have a javascript to parse the messages published by the client, which I invoke using
    client.on(‘message’, abcfunc);

    so how do I call this parser for each of the clientId seperately for it to be faster?

      1. Hello Steve,
        this might be a bit off the MQTT track, but wanted to know if there is a way I can handle thread creation in javascript instead of python, or if I could call a javascript function from within python to reuse my code?

        1. Hi
          You could start the javascript code as an external program but that would be much slower.
          From your query I assume you want to use a Javascript client is that correct? are you using nodejs?
          Rgds
          Steve

  4. Hi Steve, thanks for the different scenarios and solutions provided in this code. Its been really helpful for persons that are new to these topics!

    Im quite lost on which type of scenario would be helpful for my case.
    Im trying to test an IoT platform that can handle multiple devices (my goal is to test 10K devices sending data to the same broker). These devices must send data every 10 seconds. Currently, Im using the code demo2.py so the responsiveness for each device is fast, but Im not sure if this can be scaled to 10K devices. Currently, Im being able to handle 200 devices connected and sending data every 10 seconds (with a little delay of 0.1 ms at most). But when I try to handle 1000 devices, I cannot connect to them and some of the return codes are 3, and eventually, the clients end up disconnecting. Using threads for each client is the best way to do it? Is there a limit to the amount of threads?

    Any recommendations on how to approach my case?

  5. I was looking for such a script that would put my IoT platform through a stress test.
    I took the liberty to adapt the script “demo2.py” so that the connection is now encrypted and a message in JSON format with ID and timestamp is sent out.

    thanks Steve without these tutorials I would never have finished programming my IoT-Platorm.

    Code:
    https://github.com/IoTree/IoTree42/tree/master/testing

    There is also an easy-to-install IoT platform.

  6. Hi Steve, your website is wonderfull to learn more about mqtt. My idea is to send message from mqtt python client over the mqtt broker to esp8266 board. Which implemtations steps are required on both side (python client) and the esp board ?

  7. Hi Steve
    This is very helpful. Thank you!
    I have to establish the version with Threading, so version 3 in your example.
    I am able to connect and to disconnect.
    But I cannot see that the on_message is called, means no message is printed at new messages.
    Any idea what could be the problem?

      1. Hi Steve. Just now I found my issue. I had a missing subscribe. Thanks anyway.
        Again. Your way to expalin in this tutorials is fantastic.

Leave a Reply

Your email address will not be published. Required fields are marked *