How to Send a File Using MQTT and Python

send-file-mqttIn this tutorial will will look at how we can send a file or picture using MQTT.

The script is written in Python and the approach I used was to send the file as bytes.

There is an IBM tutorial which does the same thing but uses ASCII characters.

The advantage of this is that you can easily add extra information, and put the data into a JSON string for sending.





So as to identify the start and end of file I use a header block and end block frame of 200bytes.

The Send format is

  • Send header frame
  • Loop to Read data from disk in byte chunks and send. Also calculate a file hash.
  • Send end block containing the file hash when done.

The Receive format is:

  • Read in data and extract header block
  • Look for end block while receiving data.
  • Save file to disk and calculate a file hash
  • When end detected compare the file hash in the received end data block with the one calculated at the receiver to verify success.

Notes:

  1. I didn’t extract the file name from the header to create the received file with the same name as the sent file.
  2. I used a QOS of 1 and waited for a PUBACK before sending the next data block to be nice to the MQTT broker and to help guarantee a successful send.
  3. I don’t detect network failures in this script.

The Code Is shown below. Code notes follow the code.

#! python3.4
###demo code provided by Steve Cope at www.steves-internet-guide.com
##email steve@steves-internet-guide.com
###Free to use for any purpose
"""
Send File Using MQTT
"""
import time
import paho.mqtt.client as paho
import hashlib
broker="broker.hivemq.com"
broker="iot.eclipse.org"
broker="192.168.1.206"
#filename="DSCI0027.jpg"
filename="passwords.py" #file to send
topic="data/files"
qos=1
data_block_size=2000
fo=open(filename,"rb")
fout=open("1out.txt","wb") #use a different filename
# for outfile as I'm rnning sender and receiver together
def process_message(msg):
   """ This is the main receiver code
   """
   if len(msg)==200: #is header or end
      msg_in=msg.decode("utf-8")
      msg_in=msg_in.split(",,")
      if msg_in[0]=="end": #is it really last packet?
         in_hash_final=in_hash_md5.hexdigest()
         if in_hash_final==msg_in[2]:
            print("File copied OK -valid hash  ",in_hash_final)
         else:
            print("Bad file receive   ",in_hash_final)
         return False
      else:
         if msg_in[0]!="header":
            in_hash_md5.update(msg)
            return True
         else:
            return False
   else:
      in_hash_md5.update(msg)
      return True
#define callback
def on_message(client, userdata, message):
   time.sleep(1)
   #print("received message =",str(message.payload.decode("utf-8")))
   if process_message(message.payload):
      fout.write(message.payload)

def on_publish(client, userdata, mid):
    #logging.debug("pub ack "+ str(mid))
    client.mid_value=mid
    client.puback_flag=True  


def wait_for(client,msgType,period=0.25,wait_time=40,running_loop=False):
    client.running_loop=running_loop #if using external loop
    wcount=0  
    while True:
        #print("waiting"+ msgType)
        if msgType=="PUBACK":
            if client.on_publish:        
                if client.puback_flag:
                    return True
     
        if not client.running_loop:
            client.loop(.01)  #check for messages manually
        time.sleep(period)
        #print("loop flag ",client.running_loop)
        wcount+=1
        if wcount>wait_time:
            print("return from wait loop taken too long")
            return False
    return True 

def send_header(filename):
   header="header"+",,"+filename+",,"
   header=bytearray(header,"utf-8")
   header.extend(b','*(200-len(header)))
   print(header)
   c_publish(client,topic,header,qos)

def send_end(filename):
   end="end"+",,"+filename+",,"+out_hash_md5.hexdigest()
   end=bytearray(end,"utf-8")
   end.extend(b','*(200-len(end)))
   print(end)
   c_publish(client,topic,end,qos)

def c_publish(client,topic,out_message,qos):
   res,mid=client.publish(topic,out_message,qos)#publish
   if res==0: #published ok
      if wait_for(client,"PUBACK",running_loop=True):
         if mid==client.mid_value:
            print("match mid ",str(mid))
            client.puback_flag=False #reset flag
         else:
            raise SystemExit("not got correct puback mid so quitting")
         
      else:
         raise SystemExit("not got puback so quitting")
client= paho.Client("client-001")  #create client object client1.on_publish = on_publish                          #assign function to callback client1.connect(broker,port)                                 #establish connection client1.publish("data/files","on")  
######
client.on_message=on_message
client.on_publish=on_publish
client.puback_flag=False #use flag in publish ack
client.mid_value=None
#####
print("connecting to broker ",broker)
client.connect(broker)#connect
client.loop_start() #start loop to process received messages
print("subscribing ")
client.subscribe(topic)#subscribe
time.sleep(2)
start=time.time()
print("publishing ")
send_header(filename)
Run_flag=True
count=0
##hashes
out_hash_md5 = hashlib.md5()
in_hash_md5 = hashlib.md5()

while Run_flag:
   chunk=fo.read(data_block_size) # change if want smaller or larger data blcoks
   if chunk:
      out_hash_md5.update(chunk)
      out_message=chunk
      #print(" length =",type(out_message))
      c_publish(client,topic,out_message,qos)
         
   else:
      #send hash
      out_message=out_hash_md5.hexdigest()
      send_end(filename)
      #print("out Message ",out_message)
      res,mid=client.publish("data/files",out_message,qos=1)#publish
      Run_flag=False
time_taken=time.time()-start
print("took ",time_taken)
time.sleep(4)
client.disconnect() #disconnect
client.loop_stop() #stop loop
fout.close() #close files
fo.close()

When I run the script with a test file this is what I see:

send-file-mqtt-output

Code Notes:

  1. The process_message function handles the received messages and stores the data in a file.
  2. I use a number of flags attached to the client instance like:
    client.puback_flag
    client.mid_value

Code Download

Combined Send and receive

Send File and receive file

Comments and Feedback

Was there enough detail in this tutorial for you to follow?

Please help me improve these tutorials by leaving your comments,rating them,asking questions.

Related tutorials and resources:

Facebooktwittergoogle_plusredditpinterestlinkedinmail

10 comments

  1. I am new to MQTT and Python.
    I am splitting your example up so that I am publishing on one machine and subscribing on another.
    What documentation should I look at that would point to how to parse out the filename on the receiving machine so that I end up with the same file name on both ends?

    Thank you

    1. Bill
      You need to include the filename as part of the header. The script already does it
      def send_header(filename):
      header=”header”+”,,”+filename+”,,”
      header=bytearray(header,”utf-8″)
      header.extend(b’,’*(200-len(header)))
      print(header)
      what It doesn’t do is extract the filename when it receives it. so you need to do that bit.
      Use the ask steve page if you are struggling with it and I’ll take a look at it.
      Rgds
      Steve

      1. As an alternative to sticking stuff in headers, you could firstly zip the file (so if it doesn’t unzip you know you’re not dealing with a complete file), then put the filename in the zip entry, or even in the topic name and subscribe to /x/y/# and send the message to /x/y/filename perhaps.

        1. To topic idea would be a good idea if there were lots of file downloads.The hash already checks for the complete file but zipping would probably make it quicker for large files.
          I did create a faster version that disn’t wait for the puback as it slows it down a lot.I don’t think it is quite finished but if anyone is interested in trying it I will email it just use the ask steve page.
          rgds
          Steve

  2. Hi Steve,
    When I try to run this code separately as subscriber.py and publish.py it is failing throwing error as “return from wait loop taken too long”
    Could you please tell what might be going wrong?
    I am just a beginner in MQTT and python
    Please help me out

    Thanks in Advance.

  3. I don’t detect network failures in this script.

    – Just wondering in case of network disconnect. but using clean session if the client reestablish the same session. can it download from the point were it left ? or the client will start over ?

    1. No when you reconnect you will have to start the download again.
      However you can implement the functionality in the script. Is it something that you need?

  4. i try to send file (size=1mb), this code work
    but when i try to send file (size=30mb or bigger), this code work, but the Delivered file are not complete(original file size=30mb, new copy file size=40kb)

Leave a Reply

Your email address will not be published. Required fields are marked *