How to Use the Python MQTT Client with a Backup Broker

Introduction

In this project we will be developing python scripts to use a backup broker to publish and receive messages to improve message reliability in failure situations.

MQTT offers three QOS settings -0,1,2 as described in the understanding QOS tutorial.

QOS levels 1 and 2 require an acknowledgement from the MQTT broker and provide for reliable message transfer.

However there are many circumstances were just relying on these QOS levels isn’t sufficient.

As an example what happens if the broker goes down?

Well messages that are inflight with a QOS of 1 or 2 will be queued for later delivery new messages will be discarded.

Although many cloud providers provide high availability brokers these options tend to be expensive.

A simple alternative is to publish to multiple brokers. as shown in the project diagram below:

Project Overview

backup-broker-diagram
The first part of the project we will develop publishing scripts to send messages to two brokers and then we we develop a script to receive messages from 2 brokers.

Publishing Messages

To do this we have three main options.

  1. Connect and Publish to a primary broker and connect to a second backup broker if the primary is unavailable.
  2. Publish to a primary broker and switch to a second backup broker on fail and queue if necessary.
  3. Publish to both brokers and filter messages on the client to avoid duplicates.

The Python client will queue messages to send over a connection but only if that connection is up.

If the connection fails then messages sent for publishing are lost.

The following two demo clients illustrate how to use a backup broker when publishing.

Demo Client 1

This client attempts to connect to the primary brokers on start.

If the primary broker is unavailable it will connect to the backup broker.

If both the primary and backup broker connections fail then the client exits and messages will be messages lost.

The connection code is shown below:

print("connecting to brokers ",broker1,"   ",broker2)
client.connected_flag=False
client.connection_failed_flag=False
#try broker 1 and if it fails try broker2 
try:
   client.connect(broker1,port1)#connect
   client.loop_start()
except:
   print("client connection to broker 1 failed")
   client.connection_failed_flag=True

if client.connection_failed_flag: #failed so try second broker
   try:
      print("connecting to broker 2")
      client.connect(broker2,port2)#connect
      client.loop_start()
   except:
      print("client connection to broker 2 failed")
      client.connection_failed_flag=True
      sys.exit(1) #no point continuing

Demo Client 2

This client attempts to connect to both brokers on start. It publishes to the primary broker by default and in case of network failure in switches to the backup broker.

If the primary broker comes back on line it will switch back to using the primary broker.

If both the primary and backup broker connections fail then messages are queued.

When either connection becomes available it will publish the queued messages, and then continue to publish new messages.

The first part of the code established the connections and stores the connection details in an array called clients:

clients=[]
if connect_broker(client1,broker1,port1):
   clients.append(client1)
else:
   print("failed to connect broker 1")
   #sys.exit(1)
if connect_broker(client2,broker2,port2):
   clients.append(client2)
else:
   print("failed to connect broker 2")
   #sys.exit(1)
print("Number of connected brokers ="+str(len(clients))) 

Now we need to publish using one of the clients. Client 1 is preferred and so the connection is tested and if ok it is used otherwise client 2 is used.

Before publishing we attempt to empty the queue to keep the messages in the correct order

while True:
   count+=1
   #we are only publishing the count value we try to empy the queue before
   #publishing the latest message
   time.sleep(1)
   if not client1.connection_failed_flag and client1.connected_flag:
      if len(queue) >0:
         empty_queue(client1,queue)
         queue=[] #clear now sent
      res,mid= client1.publish(topic,count,1)#publish
   elif not client2.connection_failed_flag and client2.connected_flag:
     if len(queue) >0:
         empty_queue(client2,queue)
         queue=[] #clear now sent
     res,mid= client2.publish(topic,count,1)#publish
   else:
      queue.append(count)
      print("Queue length="+str(len(queue)))
    


   if count==10000:
      count=0

Note: only works as described if both client connections are established at the start. To reconnect while main script is running would require using threads so as to not cause a delay in the main loop. However a simple fix is not to start unless both connections succeed on start.

Duplicate Messages

because the client will send either to broker 1 or broker2 there isn’t really a problem with message duplicates on the receiving side.

Receiving Messages

To receive the messages we simply subscribe to both brokers using the same topics as the publishing clients.

This client attempts to connect to both brokers on start.

If both the primary and backup broker connections fail then the script exits.

When either connection becomes available it will publish the queued messages, and then continue to publish new messages.

The first part of the code establishes the connections and stores the connection details in an array called clients:

clients=[]
if connect_broker(client1,broker1,port1):
   clients.append(client1)
else:
   print("failed to connect broker 1")
   #sys.exit(1)
if connect_broker(client2,broker2,port2):
   clients.append(client2)
else:
   print("failed to connect broker 2")
   #sys.exit(1)
print("Number of connected brokers ="+str(len(clients))) 

We now need an on_message callback to process the received data.

Because we are sending to only one broker at any time with the publish code then it doesn’t matter which connection receives the messages.
We start with the on_message callback which simply extracts the data and send it to the process message function:

def on_message(client, userdata, message):
   msg=message.payload.decode("utf-8")
   process_message(msg)

The process message checks the msg counter and flags an error if it is not what is expected.

def process_message(msg):
   global rx_count
   msg=json.loads(msg)
   count=msg["count"]
   print("Count:%s " % rx_count, end='')
   if count!=rx_count:
      print("error count=",count)
      rx_count=count+1
   else:
      rx_count+=1

Note: The receive code works with both demo1 and 2 publish scripts.

Demo scripts download

The link contains both publish demo scripts and the receive script.You need to start the receive script before you publish.

download

Related Tutorials and Resources:

 

Please rate? And use Comments to let me know more

Leave a Reply

Your email address will not be published. Required fields are marked *