Using the Python MQTT client with a Backup Broker

Introduction

In this project we will be developing python scripts to use a backup broker to publish and receive messages to improve message reliability in failure situations.

MQTT offers three QOS settings -0,1,2 as described in the understanding QOS tutorial.

QOS levels 1 and 2 require an acknowledgement from the MQTT broker and provide for reliable message transfer.

However there are many circumstances were just relying on these QOS levels isn’t sufficient.

As an example what happens if the broker goes down?

Well messages that are inflight with a QOS of 1 or 2 will be queued for later delivery new messages will be discarded.

Although many cloud providers provide high availability brokers these options tend to be expensive.

A simple alternative is to publish to multiple brokers as shown in the diagram below:

backup-broker-diagram

The Project

The first part of the project we will develop publishing scripts to send messages to two brokers and then we we develop scripts to receive messages from 2 brokers.

Publishing Messages -Part 1

To do this we have three main options.

  1. Connect and Publish to a primary broker and connect to a second backup broker if the primary is unavailable.
  2. Publish to a primary broker and switch to a second backup broker on fail and queue if necessary.
  3. Publish to both brokers and filter messages on the client to avoid duplicates.

The Python client will queue messages to send over a connection but only if that connection is up.

If the connection fails then messages sent for publishing are lost.

The following two demo clients illustrate how to use a backup broker.

Demo Client 1

This client attempts to connect to the primary brokers on start.

If the primary broker is unavailable it will connect to the backup broker.

If both the primary and backup broker connections fail then the client exits and messages will be messages lost.
The connection code is shown below:

print("connecting to brokers ",broker1,"   ",broker2)
client.connected_flag=False
client.connection_failed_flag=False
#try broker 1 and if it fails try broker2 
try:
   client.connect(broker1,port1)#connect
   client.loop_start()
except:
   print("client connection to broker 1 failed")
   client.connection_failed_flag=True

if client.connection_failed_flag: #failed so try second broker
   try:
      print("connecting to broker 2")
      client.connect(broker2,port2)#connect
      client.loop_start()
   except:
      print("client connection to broker 2 failed")
      client.connection_failed_flag=True
      sys.exit(1) #no point continuing

Demo Client 2

This client attempts to connect to both brokers on start. It publishes to the primary broker by default and in case of network failure in switches to the backup broker.

If the primary broker comes back on line it will switch back to using the primary broker.

If both the primary and backup broker connections fail then messages are queued.

When either connection becomes available it will publish the queued messages, and then continue to publish new messages.

The first part of the code established the connections and stores the connection details in an array called clients:

clients=[]
if connect_broker(client1,broker1,port1):
   clients.append(client1)
else:
   print("failed to connect broker 1")
   #sys.exit(1)
if connect_broker(client2,broker2,port2):
   clients.append(client2)
else:
   print("failed to connect broker 2")
   #sys.exit(1)
print("Number of connected brokers ="+str(len(clients))) 

Now we need to publish using one of the clients. Client 1 is preferred and so the connection is tested and if ok it is used otherwise client 2 is used.
Before publishing we attempt to empy the queue to keep the messages in the correct order

while True:
   count+=1
   #we are only publishing the count value we try to empy the queue before
   #publishing the latest message
   time.sleep(1)
   if not client1.connection_failed_flag and client1.connected_flag:
      if len(queue) >0:
         empty_queue(client1,queue)
         queue=[] #clear now sent
      res,mid= client1.publish(topic,count,1)#publish
   elif not client2.connection_failed_flag and client2.connected_flag:
     if len(queue) >0:
         empty_queue(client2,queue)
         queue=[] #clear now sent
     res,mid= client2.publish(topic,count,1)#publish
   else:
      queue.append(count)
      print("Queue length="+str(len(queue)))
    


   if count==10000:
      count=0

Note: only works as described if both client connections are established at the start. To reconnect while main script is running would require using threads so as to not cause a delay in the main loop. However a simple fix is not to start unless both connections succeed on start.

Demo scripts download

The link contains both demo scripts.

download

Related Tutorials and Resources:

Project Page

 

Please rate? And use Comments to let me know more

Leave a Reply

Your email address will not be published. Required fields are marked *