Introduction
In this project we will be developing python scripts to use a backup broker to publish and receive messages to improve message reliability in failure situations.
MQTT offers three QOS settings -0,1,2 as described in the understanding QOS tutorial.
QOS levels 1 and 2 require an acknowledgement from the MQTT broker and provide for reliable message transfer.
However there are many circumstances were just relying on these QOS levels isn’t sufficient.
As an example what happens if the broker goes down?
Well messages that are inflight with a QOS of 1 or 2 will be queued for later delivery new messages will be discarded.
Although many cloud providers provide high availability brokers these options tend to be expensive.
A simple alternative is to publish to multiple brokers as shown in the diagram below:
The Project
The first part of the project we will develop publishing scripts to send messages to two brokers and then we we develop scripts to receive messages from 2 brokers.
Publishing Messages -Part 1
To do this we have three main options.
- Connect and Publish to a primary broker and connect to a second backup broker if the primary is unavailable.
- Publish to a primary broker and switch to a second backup broker on fail and queue if necessary.
- Publish to both brokers and filter messages on the client to avoid duplicates.
The Python client will queue messages to send over a connection but only if that connection is up.
If the connection fails then messages sent for publishing are lost.
The following two demo clients illustrate how to use a backup broker.
Demo Client 1
This client attempts to connect to the primary brokers on start.
If the primary broker is unavailable it will connect to the backup broker.
If both the primary and backup broker connections fail then the client exits and messages will be messages lost.
The connection code is shown below:
print("connecting to brokers ",broker1," ",broker2) client.connected_flag=False client.connection_failed_flag=False #try broker 1 and if it fails try broker2 try: client.connect(broker1,port1)#connect client.loop_start() except: print("client connection to broker 1 failed") client.connection_failed_flag=True if client.connection_failed_flag: #failed so try second broker try: print("connecting to broker 2") client.connect(broker2,port2)#connect client.loop_start() except: print("client connection to broker 2 failed") client.connection_failed_flag=True sys.exit(1) #no point continuing
Demo Client 2
This client attempts to connect to both brokers on start. It publishes to the primary broker by default and in case of network failure in switches to the backup broker.
If the primary broker comes back on line it will switch back to using the primary broker.
If both the primary and backup broker connections fail then messages are queued.
When either connection becomes available it will publish the queued messages, and then continue to publish new messages.
The first part of the code established the connections and stores the connection details in an array called clients:
clients=[] if connect_broker(client1,broker1,port1): clients.append(client1) else: print("failed to connect broker 1") #sys.exit(1) if connect_broker(client2,broker2,port2): clients.append(client2) else: print("failed to connect broker 2") #sys.exit(1) print("Number of connected brokers ="+str(len(clients)))
Now we need to publish using one of the clients. Client 1 is preferred and so the connection is tested and if ok it is used otherwise client 2 is used.
Before publishing we attempt to empy the queue to keep the messages in the correct order
while True: count+=1 #we are only publishing the count value we try to empy the queue before #publishing the latest message time.sleep(1) if not client1.connection_failed_flag and client1.connected_flag: if len(queue) >0: empty_queue(client1,queue) queue=[] #clear now sent res,mid= client1.publish(topic,count,1)#publish elif not client2.connection_failed_flag and client2.connected_flag: if len(queue) >0: empty_queue(client2,queue) queue=[] #clear now sent res,mid= client2.publish(topic,count,1)#publish else: queue.append(count) print("Queue length="+str(len(queue))) if count==10000: count=0
Note: only works as described if both client connections are established at the start. To reconnect while main script is running would require using threads so as to not cause a delay in the main loop. However a simple fix is not to start unless both connections succeed on start.
Demo scripts download
The link contains both demo scripts.
Related Tutorials and Resources: