It is common to require several connections either to a single broker or to multiple brokers.
There a several ways of achieving this and we will examine two ways.
First let us recap on what we require with a single connection.
- unique client_id
- client loop to process callbacks
- broker address and port.
If we have multiple client connections then we normally need a:
- unique client_id for each connection
- A client loop for each connection
- Broker address and port details for each connection.
Multiple Clients and Threads loop() and loop_start()
On a Single client scenario we can send and receive using a single thread if we use the client.loop() function instead of the loop_start and loop_stop functions.
However this isn’t normally done as it is easier to use the loop_start() and Loop_stop() as they handle reconnects automatically, whereas the loop() function doesn’t and so you need to do that yourself.
If we use the loop_start() function for a single client then we have two threads.
Having two threads as opposed to one isn’t a problem but if you want to implement several thousand clients then it is.
However for 10 or twenty clients then it is probably OK
Threading Advantages
The main advantage of using threads is speed. If your script needs to be very responsive then your will probably need threads.
Tutorial Overview
I have tried various methods of implementing multiple clients and they vary in complexity.
The main problem with multiple clients is:
- Handling of the client Connections and reconnects
- Handling of the callbacks
We will cover:
- Multiple clients Using Loop_start() and loop_stop() functions.
- Multiple clients Using Loop() function
- Multiple Clients using Threads.
Method 1 – Multiple Client Loops.-Loop_start()
For multiple client connections then each client will need a loop.
Therefore if you have 2 client connections you will need two loops which equals two additional threads.
For 20 client connections you will need twenty loops which equals twenty additional threads.
The code below show how we create unique client_ids and multiple loops for multiple clients,
import paho.mqtt.client as mqtt clients=[] nclients=20 mqtt.Client.connected_flag=False #create clients for i in range(nclients): cname="Client"+str(i) client= mqtt.Client(cname) clients.append(client) for client in clients: client.connect(broker) client.loop_start()
Code explanation
1. We use an array to store the client connections.
2. The first for loop create a unique client id and uses that id to create a new connection.
3. It then stores the client connection reference in an array.
4. The second for loop goes through the connection array and uses the client object to create the client loop.
5. We now have 20 loops active and 20 additional threads. (nclients=20).
How do we publish and/or subscribe?
We just get client object from the array
for client in clients: client.connect(broker) client.subscribe(topic) client.publish(topic,message)
Alternatively we can subscribe in the on_connect callback.
def on_connect(client, userdata, flags, rc): if rc==0: client.connected_flag=True client.subscribe(topic)
Note: You don’t need separate callbacks for each client as the callback contains the client object
Receiving Messages
Uses the standard on_message callback
def on_message(client, userdata, message): print("message received",str(message.payload.decode("utf-8")))
Notes:
The approach works well if all of the clients connect to the same broker, and do the same thing.
It is easy to modify the script to publish and subscribe to different auto generated topics using the same technique as used in creating the unique client id.
However using different brokers isn’t so simple.
Uses: I’ve used this style for creating stress test scripts where you need lots of clients that publish and subscribe.
Demo script: The demo script demo1.py connects and publishes and subscribes on various topics.
Received messages are displayed along with the thread count.
Example :
Modification for Multiple Brokers, and Topics
In this alternative we use a Python dictionary to store the client details connection,subscribe and publish details which gives use more flexibility.
However the price we pay is increased initial configuration.
Code:
client1={broker1:192.168.1.159,port:1883,name:”blank”,sub_topic:”test1″,pub_topic:”test1″,client=””}
For multiple clients we add them to an array or dictionary.
clients=[ {"broker":"192.168.1.159","port":1883,"name":"blank","sub_topic":"test1","pub_topic":"test1"}, {"broker":"192.168.1.65","port":1883,"name":"blank","sub_topic":"test2","pub_topic":"test2"} ]
Notes: I’ve left the client_id as blank as it is better to create a random name using techniques like this:
t=int(time.time()) #remove decimal part t=str(t) #concert to string client_id=client1+t and client_id=client2+t etc
we add it to the dictionary using:
clients[0]["name"]=client_id
You could also add the client object using:
client=mqtt.Client(client_id) clients[0]["client"]=client
In that way you have references between the client object,broker,topics etc.
Demo script: The demo script demo1a.py uses an array and dictionary to store client data and is suitable when dealing with multiple brokers and topics. Otherwise it does the same as demo1.py
Method 2 – Using The loop() function
If we use the loop() function we can reduce the number of threads.
The demo script demo1b.py is the same as demo1a.py but uses loop() and not loop_start().
You should also note and test that the main while loop is a bad example when using the loop() call as the while loop has lots or sleep calls.
To counter this it is better to use a separate thread to call the while loop.
Here is the code:
def multi_loop(nclients): global run_flag while run_flag: for i in range(nclients): client=clients[i]["client"] client.loop(0.01)
To start in a new thread use:
t = threading.Thread(target=multi_loop,args=(nclients,True))#start multi loop t.start()
Method 3 – Using Threads for Each Client
If you need each client to be very responsive then you will need to start a thread for each client
The script demo2.py uses a different thread for each client connection.
The thread calls the client loop() function manually and there is no need for loop_start().
The script handles reconnects and will attempt 3 before stopping that client thread.
Total threads =number of clients+1 .
Note: If started using IDE then add another thread.
The clients do nothing at present to make them do something edit the pub function.
You can create a different pub function for each client if needed but you will need to modify how the threads are created if you do.
Course Links
- Introduction to the Paho Python MQTT Client
- Introduction to the Client Class
- Connecting to a Broker
- Publishing Using The Paho Python MQTT Client
- –Subscribing using The Paho Python Client
- Receiving Messages with the Paho MQTT Python Client
- Understanding The Loop
- Understanding Callbacks
- Handling Multiple Client Connections
Hi Steve,
I used your Multiple Clients script and found a bug in demo2-.py of the multi-clients.7z file.
The script checks a few times for the ‘client.bad_connection_flag’ value but it always stays ‘false’ because of the typo ‘client.badconnection_flag’ in the line ‘client.badconnection_flag=True’ in the ‘def Connect’ routine.
Regards,
Michel
Tks I’ll take a look
Rgds
Steve
On github, there is a project called “asyncio-mqtt”. Does this approach work well for handling multiple clients?
Yes. I did actually write a few scripts using asyncio it looks like this would make it easier than coding your own.I have never used it but might give it a try now that you have bought it to my attention.
Rgds
Steve
Multiple clientes will all receive the same message, right?
So they will process the topic N times, the same thing, right?
So I don’t get the use of threads, messages will not be segragated among the clients
Each client can connect to the same broker or different brokers and subscribe to the same topics or different topics.
Does that make sense?
Rgds
Steve
Muchas gracias Steve, saludos desde Chile.
Hi Steve,
Does Mosquitto client support redundant brokers. My broker has that kind of primary and backup broker configuration, can Mosquitto client maintain the connection like if primary downs client should connect to secondary and vice verse.
Regards,
Rahul
Yes you can create two client objects and connect one to one broker and the other to the secondary broker.
You can detect the connection status and send data to the connected broker. if both are ok you use the primary.
If you are stuck let me know and I’ll take a look.
Rgds
Steve
Thanks Steve. Let me check the same. Will let you know the outcome.
It worked. Thanks
well done
Rgds
Steve
Hi Steve, and thanks for your tutorial.
I try to write load test, there is a scenario: we make api request which triggers message come on mqtt, from this message we take specific info and after that publish message. I need a lot of clients and use your first approach for client creation. Script works, but sometimes I see random error and after it mqtt operations in all clients don’t work. Thread number in which exception occurs is always clients count I create. For example I run script for 10 clients and this Error:
Exception in thread Thread-10:
Traceback (most recent call last):
File “C:\…..\Programs\Python\Python39\lib\threading.py”, line 973, in _bootstrap_inner
self.run()
File “C:\…..\Programs\Python\Python39\lib\threading.py”, line 910, in run
self._target(*self._args, **self._kwargs)
File “C:\…..\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py”, line 3591, in _thread_main
self.loop_forever(retry_first_connection=True)
File “C:\…..\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py”, line 1756, in loop_forever
rc = self._loop(timeout)
File “C:\…..\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py”, line 1181, in _loop
rc = self.loop_write()
File “C:\…..\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py”, line 1577, in loop_write
rc = self._packet_write()
File “C:\…..\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py”, line 2464, in _packet_write
write_length = self._sock_send(
File “C:\…..\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py”, line 649, in _sock_send
return self._sock.send(buf)
File “C:\…..\Programs\Python\Python39\lib\site-packages\gevent\_ssl3.py”, line 501, in send
return self._sslobj.write(data)
ssl.SSLError: [SSL: BAD_LENGTH] bad length (_ssl.c:2483)
Maybe you have an idea how to solve this issue? thanks
What script are you using?
rgds
steve
def on_connect(client, userdata, flags, rc):
if rc == 0:
print(“connected OK Returned code=”, rc)
else:
print(“Bad connection Returned code=”, rc)
def on_message(client, userdata, msg):
global messages
messages[msg.topic] = msg.payload
clients = []
number_of_clients = 1
mqtt.Client.connected_flag = False
for i in range(number_of_clients):
client_name = “Client” + str(i)
client = mqtt.Client(client_name)
clients.append(client)
for client in clients:
client.connect_async(“broker address”, port number)
client.loop_start()
client.on_connect = on_connect
client.on_message = on_message
client.tls_set(CA_CERT, DEVICE_CRT, PRIVATE_KEY)
client.tls_insecure_set(True)
def publish(payload, topic):
result = client.publish(topic, payload, 1)
result.wait_for_publish(40)
if result.is_published():
print(f”Message was sent to the topic `{topic}`”)
else:
print(f”Message was NOT published to the topic `{topic}`”)
it is code of mqtt_client.py.In my load test(which runs in multiple threads) I use publish method from it:
mqtt_client.publish(message, topic) and also subscribe( accessing one of client from mqtt_client.py like:)
mqtt_client.client.subscribe(“commands/{}”.format(self.gateway_id))
I’ve emailed you
what is the maximum number of clients you can connect to the Broker?
How many publishers can it support simultaneously?
How many subscribers can it support simultaneously?
Can I have many publishers and one subscriber (with subscription to many topics from multiple publishers)?
There is not limit in the protocol. The limit is with the broker hardware just as with http.
I would like to simulate a system with 100 subscribers and 100 publishers, I am using mosquitto as a broker and the clients are two python scripts. On the editors side I have managed to configure it with 100 editors, using python thread, but I have doubts about implementing the same in the subscribers part. I have seen these examples that you have here, they have helped me a lot to guide me, but if I want to execute the subscribers in another script apart, I know that I must make threads for each client, my question if it can be created in this way:
for client_id, client in all_clients:
t = threading.Thread (target = Create_connections, args = (client, broker, port))
t.start ()
all_threads.append (t)
# at the end wait for end of threads
for t in all_threads:
t.join ()
Looks ok but I haven’t done this for a while.
You don’t actually need 100 threads as they can all run in the same thread. Having separate threads should be quicker but it is not a must except when you need to connect to different brokers.
On the subscribe side I would just use one thread.
rgds
steve
Hi Steve awesome tutorials. I am currently having difficulty trying to publish gps data from a simulated drone to multiple other instances of drones using mqtt. I have managed to get it working to publish data to one other drone but struggling to scale this up to two drones or more. I feel my issue may be with the client id’s or rather struggling to get the listener drones to subscribe before the main one publishes its data. Do you have any recommendations or code examples that would be appropritate for this . Urgently need help with this for my thesis. Thanks and can’t wait to hear from you.
Hi
Use the ask steve page and send me the code you are currently using
http://www.steves-internet-guide.com/ask-steve/
rgds
steve
Hi Steve,
Is it possible we have one client which connects to two different broker (one is local and other is public)?
Not one client but a single script with two clients.
Rgds
Steve
Hello Steve,
For Performance test my devices (50K) needs to be in connected state for 24 Hours and all the devices should be subscribed to a topic.
The application will send the message to these devices in the topic subscribed. My latency calculation will be from the time of Application published the messages to Getting the message in the devices.Is it possible to achieve this scenario?
Thanks,
Jega
I have a test client that creates multiple connections. I got it to 1024 but the broker stopped accepting after that. It is a unix configuration issue to do with file handles that you need to change a setting but I never did and so I never tested beyond that.
However the client code was running on windows and didn’t complain.
You can obviously use multiple clients to achieve this level if needed.
Rgds
Steve
Hello Steve,
I have a javascript to parse the messages published by the client, which I invoke using
client.on(‘message’, abcfunc);
so how do I call this parser for each of the clientId seperately for it to be faster?
The easiest way is to start each connection using it’s own thread.
Rgds
Steve
Hello Steve,
this might be a bit off the MQTT track, but wanted to know if there is a way I can handle thread creation in javascript instead of python, or if I could call a javascript function from within python to reuse my code?
Hi
You could start the javascript code as an external program but that would be much slower.
From your query I assume you want to use a Javascript client is that correct? are you using nodejs?
Rgds
Steve
yes I am using Nodejs
I Haven’t tried it with nodejs I will try to give it a go.
Rgds
Steve
Hi Steve, thanks for the different scenarios and solutions provided in this code. Its been really helpful for persons that are new to these topics!
Im quite lost on which type of scenario would be helpful for my case.
Im trying to test an IoT platform that can handle multiple devices (my goal is to test 10K devices sending data to the same broker). These devices must send data every 10 seconds. Currently, Im using the code demo2.py so the responsiveness for each device is fast, but Im not sure if this can be scaled to 10K devices. Currently, Im being able to handle 200 devices connected and sending data every 10 seconds (with a little delay of 0.1 ms at most). But when I try to handle 1000 devices, I cannot connect to them and some of the return codes are 3, and eventually, the clients end up disconnecting. Using threads for each client is the best way to do it? Is there a limit to the amount of threads?
Any recommendations on how to approach my case?
You may find that the server is blocking the clients.
For 10k clients I would run the client code on several computers maybe 500 each
Rgds
Steve
I was looking for such a script that would put my IoT platform through a stress test.
I took the liberty to adapt the script “demo2.py” so that the connection is now encrypted and a message in JSON format with ID and timestamp is sent out.
thanks Steve without these tutorials I would never have finished programming my IoT-Platorm.
Code:
https://github.com/IoTree/IoTree42/tree/master/testing
There is also an easy-to-install IoT platform.
Well done
Rgds
steve
Hi Steve, your website is wonderfull to learn more about mqtt. My idea is to send message from mqtt python client over the mqtt broker to esp8266 board. Which implemtations steps are required on both side (python client) and the esp board ?
Hi
You just need a client for both systems. You will find a pub/sub client for python on the site.
http://www.steves-internet-guide.com/python-mqtt-publish-subscribe/
Rgds
Steve
Hi Steve
This is very helpful. Thank you!
I have to establish the version with Threading, so version 3 in your example.
I am able to connect and to disconnect.
But I cannot see that the on_message is called, means no message is printed at new messages.
Any idea what could be the problem?
Hi
This is almost always caused by a missing client loop. Check that it is being called for each client
rgds
steve
Hi Steve. Just now I found my issue. I had a missing subscribe. Thanks anyway.
Again. Your way to expalin in this tutorials is fantastic.
Hi Alex, I think having the same problem as you , connected but not displaying the messages. Can you further elaborate on how you solved this issue ?