Handling Multiple MQTT Client Connections Using Python

It is common to require several connections either to a single broker or to multiple brokers.

There a several ways of achieving this and we will examine two ways.

First let us recap on what we require with a single connection.

  • unique client_id
  • client loop to process callbacks
  • broker address and port.

If we have multiple client connections then we normally need a:

  • unique client_id for each connection
  • A client loop for each connection
  • Broker address and port details for each connection.

Multiple Clients and Threads loop() and loop_start()

On a Single client scenario we can send and receive using a single thread if we use the client.loop() function instead of the loop_start and loop_stop functions.

However this isn’t normally done as it is easier to use the loop_start() and Loop_stop() as they handle reconnects automatically, whereas the loop() function doesn’t and so you need to do that yourself.

If we use the loop_start() function for a single client then we have two threads.

Having two threads as opposed to one isn’t a problem but if you want to implement several thousand clients then it is.

However for 10 or twenty clients then it is probably OK

Threading Advantages

The main advantage of using threads is speed. If your script needs to be very responsive then your will probably need threads.

Tutorial Overview

I have tried various methods of implementing multiple clients and they vary in complexity.

The main problem with multiple clients is:

  • Handling of the client Connections and reconnects
  • Handling of the callbacks

We will cover:

  • Multiple clients Using Loop_start() and loop_stop() functions.
  • Multiple clients Using Loop() function
  • Multiple Clients using Threads.

Method 1 – Multiple Client Loops.-Loop_start()

For multiple client connections then each client will need a loop.

Therefore if you have 2 client connections you will need two loops which equals two additional threads.

For 20 client connections you will need twenty loops which equals twenty additional threads.

The code below show how we create unique client_ids and multiple loops for multiple clients,

import paho.mqtt.client as mqtt
clients=[]
nclients=20
mqtt.Client.connected_flag=False
#create clients
for i  in range(nclients):
   cname="Client"+str(i)
   client= mqtt.Client(cname)
   clients.append(client)
for client in clients:
  client.connect(broker)
  client.loop_start()

Code explanation
1. We use an array to store the client connections.
2. The first for loop create a unique client id and uses that id to create a new connection.
3. It then stores the client connection reference in an array.
4. The second for loop goes through the connection array and uses the client object to create the client loop.
5. We now have 20 loops active and 20 additional threads. (nclients=20).

How do we publish and/or subscribe?

We just get client object from the array

for client in clients: 
  client.connect(broker)
  client.subscribe(topic)
  client.publish(topic,message)

Alternatively we can subscribe in the on_connect callback.

def on_connect(client, userdata, flags, rc):

    if rc==0:
        client.connected_flag=True
        client.subscribe(topic)

Note: You don’t need separate callbacks for each client as the callback contains the client object

Receiving Messages
Uses the standard on_message callback

def on_message(client, userdata, message):
   print("message received",str(message.payload.decode("utf-8")))

Notes:
The approach works well if all of the clients connect to the same broker, and do the same thing.

It is easy to modify the script to publish and subscribe to different auto generated topics using the same technique as used in creating the unique client id.

However using different brokers isn’t so simple.

Uses: I’ve used this style for creating stress test scripts where you need lots of clients that publish and subscribe.

Demo script: The demo script demo1.py connects and publishes and subscribes on various topics.

Received messages are displayed along with the thread count.

Example :

 Modification for Multiple Brokers, and Topics

In this alternative we use a Python dictionary to store the client details connection,subscribe and publish details which gives use more flexibility.

However the price we pay is increased initial configuration.

Code:

client1={broker1:192.168.1.159,port:1883,name:”blank”,sub_topic:”test1″,pub_topic:”test1″,client=””}

For multiple clients we add them to an array or dictionary.

clients=[
{"broker":"192.168.1.159","port":1883,"name":"blank","sub_topic":"test1","pub_topic":"test1"},
{"broker":"192.168.1.65","port":1883,"name":"blank","sub_topic":"test2","pub_topic":"test2"}
]

Notes: I’ve left the client_id as blank as it is better to create a random name using techniques like this:

t=int(time.time()) #remove decimal part
t=str(t) #concert to string
client_id=client1+t
and
client_id=client2+t
etc

we add it to the dictionary using:

clients[0]["name"]=client_id

You could also add the client object using:

client=mqtt.Client(client_id)
clients[0]["client"]=client

In that way you have references between the client object,broker,topics etc.

Demo script: The demo script demo1a.py uses an array and dictionary to store client data and is suitable when dealing with multiple brokers and topics. Otherwise it does the same as demo1.py

Method 2 – Using The loop() function

If we use the loop() function we can reduce the number of threads.

The demo script demo1b.py is the same as demo1a.py but uses loop() and not loop_start().

You should note it does not handle reconnects.

You should also note and test that the main while loop is a bad example when using the loop() call as the while loop has lots or sleep calls.

The loop() function needs to be called often and so it should be in a while loop with very little delay!

To counter this it is better to use a separate thread to call the while loop.

Here is the code:

def multi_loop(nclients):
   global run_flag 
   while run_flag: 
      for i in range(nclients):
         client=clients[i]["client"]
         client.loop(0.01)

To start in a new thread use:

t = threading.Thread(target=multi_loop,args=(nclients,True))#start multi loop
t.start()

Method 3 – Using Threads for Each Client

If you need each client to be very responsive then you will need to start a thread for each client

The script demo2.py uses a different thread for each client connection.

The thread calls the client loop() function manually and there is no need for loop_start().

The script handles reconnects and will attempt 3 before stopping that client thread.

Total threads =number of clients+1 .

Note: If started using IDE then add another thread.

The clients do nothing at present to make them do something edit the pub function.

You can create a different pub function for each client if needed but you will need to modify how the threads are created if you do.

download

Course Links

  1. Introduction to the Paho Python MQTT Client
  2. Introduction to the Client Class
  3. Connecting to a Broker
  4. Publishing Using The Paho Python MQTT Client
  5. Subscribing using The Paho Python Client
  6.  Receiving Messages with the Paho MQTT Python Client
  7. Understanding The Loop
  8. Understanding Callbacks
  9. Handling Multiple Client Connections
Please rate? And use Comments to let me know more

42 comments

  1. Hi Steve,
    I used your Multiple Clients script and found a bug in demo2-.py of the multi-clients.7z file.
    The script checks a few times for the ‘client.bad_connection_flag’ value but it always stays ‘false’ because of the typo ‘client.badconnection_flag’ in the line ‘client.badconnection_flag=True’ in the ‘def Connect’ routine.
    Regards,
    Michel

    1. Yes. I did actually write a few scripts using asyncio it looks like this would make it easier than coding your own.I have never used it but might give it a try now that you have bought it to my attention.
      Rgds
      Steve

  2. Multiple clientes will all receive the same message, right?
    So they will process the topic N times, the same thing, right?
    So I don’t get the use of threads, messages will not be segragated among the clients

    1. Each client can connect to the same broker or different brokers and subscribe to the same topics or different topics.

      Does that make sense?
      Rgds
      Steve

  3. Hi Steve,
    Does Mosquitto client support redundant brokers. My broker has that kind of primary and backup broker configuration, can Mosquitto client maintain the connection like if primary downs client should connect to secondary and vice verse.

    Regards,
    Rahul

    1. Yes you can create two client objects and connect one to one broker and the other to the secondary broker.
      You can detect the connection status and send data to the connected broker. if both are ok you use the primary.
      If you are stuck let me know and I’ll take a look.
      Rgds
      Steve

  4. Hi Steve, and thanks for your tutorial.
    I try to write load test, there is a scenario: we make api request which triggers message come on mqtt, from this message we take specific info and after that publish message. I need a lot of clients and use your first approach for client creation. Script works, but sometimes I see random error and after it mqtt operations in all clients don’t work. Thread number in which exception occurs is always clients count I create. For example I run script for 10 clients and this Error:
    Exception in thread Thread-10:
    Traceback (most recent call last):
    File “C:\…..\Programs\Python\Python39\lib\threading.py”, line 973, in _bootstrap_inner
    self.run()
    File “C:\…..\Programs\Python\Python39\lib\threading.py”, line 910, in run
    self._target(*self._args, **self._kwargs)
    File “C:\…..\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py”, line 3591, in _thread_main
    self.loop_forever(retry_first_connection=True)
    File “C:\…..\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py”, line 1756, in loop_forever
    rc = self._loop(timeout)
    File “C:\…..\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py”, line 1181, in _loop
    rc = self.loop_write()
    File “C:\…..\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py”, line 1577, in loop_write
    rc = self._packet_write()
    File “C:\…..\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py”, line 2464, in _packet_write
    write_length = self._sock_send(
    File “C:\…..\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py”, line 649, in _sock_send
    return self._sock.send(buf)
    File “C:\…..\Programs\Python\Python39\lib\site-packages\gevent\_ssl3.py”, line 501, in send
    return self._sslobj.write(data)
    ssl.SSLError: [SSL: BAD_LENGTH] bad length (_ssl.c:2483)
    Maybe you have an idea how to solve this issue? thanks

      1. def on_connect(client, userdata, flags, rc):
        if rc == 0:
        print(“connected OK Returned code=”, rc)
        else:
        print(“Bad connection Returned code=”, rc)

        def on_message(client, userdata, msg):
        global messages
        messages[msg.topic] = msg.payload

        clients = []
        number_of_clients = 1
        mqtt.Client.connected_flag = False
        for i in range(number_of_clients):
        client_name = “Client” + str(i)
        client = mqtt.Client(client_name)
        clients.append(client)
        for client in clients:
        client.connect_async(“broker address”, port number)
        client.loop_start()

        client.on_connect = on_connect
        client.on_message = on_message
        client.tls_set(CA_CERT, DEVICE_CRT, PRIVATE_KEY)
        client.tls_insecure_set(True)

        def publish(payload, topic):
        result = client.publish(topic, payload, 1)
        result.wait_for_publish(40)
        if result.is_published():
        print(f”Message was sent to the topic `{topic}`”)
        else:
        print(f”Message was NOT published to the topic `{topic}`”)

        it is code of mqtt_client.py.In my load test(which runs in multiple threads) I use publish method from it:
        mqtt_client.publish(message, topic) and also subscribe( accessing one of client from mqtt_client.py like:)
        mqtt_client.client.subscribe(“commands/{}”.format(self.gateway_id))

  5. what is the maximum number of clients you can connect to the Broker?
    How many publishers can it support simultaneously?
    How many subscribers can it support simultaneously?
    Can I have many publishers and one subscriber (with subscription to many topics from multiple publishers)?

  6. I would like to simulate a system with 100 subscribers and 100 publishers, I am using mosquitto as a broker and the clients are two python scripts. On the editors side I have managed to configure it with 100 editors, using python thread, but I have doubts about implementing the same in the subscribers part. I have seen these examples that you have here, they have helped me a lot to guide me, but if I want to execute the subscribers in another script apart, I know that I must make threads for each client, my question if it can be created in this way:

    for client_id, client in all_clients:
    t = threading.Thread (target = Create_connections, args = (client, broker, port))
    t.start ()
    all_threads.append (t)

    # at the end wait for end of threads
    for t in all_threads:
    t.join ()

    1. Looks ok but I haven’t done this for a while.
      You don’t actually need 100 threads as they can all run in the same thread. Having separate threads should be quicker but it is not a must except when you need to connect to different brokers.
      On the subscribe side I would just use one thread.
      rgds
      steve

  7. Hi Steve awesome tutorials. I am currently having difficulty trying to publish gps data from a simulated drone to multiple other instances of drones using mqtt. I have managed to get it working to publish data to one other drone but struggling to scale this up to two drones or more. I feel my issue may be with the client id’s or rather struggling to get the listener drones to subscribe before the main one publishes its data. Do you have any recommendations or code examples that would be appropritate for this . Urgently need help with this for my thesis. Thanks and can’t wait to hear from you.

  8. Hi Steve,

    Is it possible we have one client which connects to two different broker (one is local and other is public)?

  9. Hello Steve,

    For Performance test my devices (50K) needs to be in connected state for 24 Hours and all the devices should be subscribed to a topic.

    The application will send the message to these devices in the topic subscribed. My latency calculation will be from the time of Application published the messages to Getting the message in the devices.Is it possible to achieve this scenario?

    Thanks,
    Jega

    1. I have a test client that creates multiple connections. I got it to 1024 but the broker stopped accepting after that. It is a unix configuration issue to do with file handles that you need to change a setting but I never did and so I never tested beyond that.
      However the client code was running on windows and didn’t complain.
      You can obviously use multiple clients to achieve this level if needed.
      Rgds
      Steve

  10. Hello Steve,

    I have a javascript to parse the messages published by the client, which I invoke using
    client.on(‘message’, abcfunc);

    so how do I call this parser for each of the clientId seperately for it to be faster?

      1. Hello Steve,
        this might be a bit off the MQTT track, but wanted to know if there is a way I can handle thread creation in javascript instead of python, or if I could call a javascript function from within python to reuse my code?

        1. Hi
          You could start the javascript code as an external program but that would be much slower.
          From your query I assume you want to use a Javascript client is that correct? are you using nodejs?
          Rgds
          Steve

  11. Hi Steve, thanks for the different scenarios and solutions provided in this code. Its been really helpful for persons that are new to these topics!

    Im quite lost on which type of scenario would be helpful for my case.
    Im trying to test an IoT platform that can handle multiple devices (my goal is to test 10K devices sending data to the same broker). These devices must send data every 10 seconds. Currently, Im using the code demo2.py so the responsiveness for each device is fast, but Im not sure if this can be scaled to 10K devices. Currently, Im being able to handle 200 devices connected and sending data every 10 seconds (with a little delay of 0.1 ms at most). But when I try to handle 1000 devices, I cannot connect to them and some of the return codes are 3, and eventually, the clients end up disconnecting. Using threads for each client is the best way to do it? Is there a limit to the amount of threads?

    Any recommendations on how to approach my case?

    1. You may find that the server is blocking the clients.
      For 10k clients I would run the client code on several computers maybe 500 each
      Rgds
      Steve

  12. I was looking for such a script that would put my IoT platform through a stress test.
    I took the liberty to adapt the script “demo2.py” so that the connection is now encrypted and a message in JSON format with ID and timestamp is sent out.

    thanks Steve without these tutorials I would never have finished programming my IoT-Platorm.

    Code:
    https://github.com/IoTree/IoTree42/tree/master/testing

    There is also an easy-to-install IoT platform.

  13. Hi Steve, your website is wonderfull to learn more about mqtt. My idea is to send message from mqtt python client over the mqtt broker to esp8266 board. Which implemtations steps are required on both side (python client) and the esp board ?

  14. Hi Steve
    This is very helpful. Thank you!
    I have to establish the version with Threading, so version 3 in your example.
    I am able to connect and to disconnect.
    But I cannot see that the on_message is called, means no message is printed at new messages.
    Any idea what could be the problem?

    1. Hi
      This is almost always caused by a missing client loop. Check that it is being called for each client
      rgds
      steve

      1. Hi Steve. Just now I found my issue. I had a missing subscribe. Thanks anyway.
        Again. Your way to expalin in this tutorials is fantastic.

        1. Hi Alex, I think having the same problem as you , connected but not displaying the messages. Can you further elaborate on how you solved this issue ?

Leave a Reply

Your email address will not be published. Required fields are marked *