How to Send a File Using MQTT and Python

send-file-mqttIn this tutorial will will look at how we can send a file or picture using MQTT.

The script is written in Python and the approach I used was to send the file as bytes.

There is an IBM tutorial which does the same thing but uses ASCII characters.

The advantage of this is that you can easily add extra information, and put the data into a JSON string for sending.

So as to identify the start and end of file I use a header block and end block frame of 200bytes.

The Send format is

  • Send header frame
  • Loop to Read data from disk in byte chunks and send. Also calculate a file hash.
  • Send end block containing the file hash when done.

The Receive format is:

  • Read in data and extract header block
  • Look for end block while receiving data.
  • Save file to disk and calculate a file hash
  • When end detected compare the file hash in the received end data block with the one calculated at the receiver to verify success.

Notes:

  1. I didn’t extract the file name from the header to create the received file with the same name as the sent file.
  2. I used a QOS of 1 and waited for a PUBACK before sending the next data block to be nice to the MQTT broker and to help guarantee a successful send.
  3. I don’t detect network failures in this script.

The Code Is shown below. Code notes follow the code.

#! python3.4
###demo code provided by Steve Cope at www.steves-internet-guide.com
##email steve@steves-internet-guide.com
###Free to use for any purpose
"""
Send File Using MQTT
"""
import time
import paho.mqtt.client as paho
import hashlib
broker="broker.hivemq.com"
broker="iot.eclipse.org"
broker="192.168.1.206"
#filename="DSCI0027.jpg"
filename="passwords.py" #file to send
topic="data/files"
qos=1
data_block_size=2000
fo=open(filename,"rb")
fout=open("1out.txt","wb") #use a different filename
# for outfile as I'm rnning sender and receiver together
def process_message(msg):
   """ This is the main receiver code
   """
   if len(msg)==200: #is header or end
      msg_in=msg.decode("utf-8")
      msg_in=msg_in.split(",,")
      if msg_in[0]=="end": #is it really last packet?
         in_hash_final=in_hash_md5.hexdigest()
         if in_hash_final==msg_in[2]:
            print("File copied OK -valid hash  ",in_hash_final)
         else:
            print("Bad file receive   ",in_hash_final)
         return False
      else:
         if msg_in[0]!="header":
            in_hash_md5.update(msg)
            return True
         else:
            return False
   else:
      in_hash_md5.update(msg)
      return True
#define callback
def on_message(client, userdata, message):
   time.sleep(1)
   #print("received message =",str(message.payload.decode("utf-8")))
   if process_message(message.payload):
      fout.write(message.payload)

def on_publish(client, userdata, mid):
    #logging.debug("pub ack "+ str(mid))
    client.mid_value=mid
    client.puback_flag=True  


def wait_for(client,msgType,period=0.25,wait_time=40,running_loop=False):
    client.running_loop=running_loop #if using external loop
    wcount=0  
    while True:
        #print("waiting"+ msgType)
        if msgType=="PUBACK":
            if client.on_publish:        
                if client.puback_flag:
                    return True
     
        if not client.running_loop:
            client.loop(.01)  #check for messages manually
        time.sleep(period)
        #print("loop flag ",client.running_loop)
        wcount+=1
        if wcount>wait_time:
            print("return from wait loop taken too long")
            return False
    return True 

def send_header(filename):
   header="header"+",,"+filename+",,"
   header=bytearray(header,"utf-8")
   header.extend(b','*(200-len(header)))
   print(header)
   c_publish(client,topic,header,qos)

def send_end(filename):
   end="end"+",,"+filename+",,"+out_hash_md5.hexdigest()
   end=bytearray(end,"utf-8")
   end.extend(b','*(200-len(end)))
   print(end)
   c_publish(client,topic,end,qos)

def c_publish(client,topic,out_message,qos):
   res,mid=client.publish(topic,out_message,qos)#publish
   if res==0: #published ok
      if wait_for(client,"PUBACK",running_loop=True):
         if mid==client.mid_value:
            print("match mid ",str(mid))
            client.puback_flag=False #reset flag
         else:
            raise SystemExit("not got correct puback mid so quitting")
         
      else:
         raise SystemExit("not got puback so quitting")
client= paho.Client("client-001")  #create client object client1.on_publish = on_publish                          #assign function to callback client1.connect(broker,port)                                 #establish connection client1.publish("data/files","on")  
######
client.on_message=on_message
client.on_publish=on_publish
client.puback_flag=False #use flag in publish ack
client.mid_value=None
#####
print("connecting to broker ",broker)
client.connect(broker)#connect
client.loop_start() #start loop to process received messages
print("subscribing ")
client.subscribe(topic)#subscribe
time.sleep(2)
start=time.time()
print("publishing ")
send_header(filename)
Run_flag=True
count=0
##hashes
out_hash_md5 = hashlib.md5()
in_hash_md5 = hashlib.md5()

while Run_flag:
   chunk=fo.read(data_block_size) # change if want smaller or larger data blcoks
   if chunk:
      out_hash_md5.update(chunk)
      out_message=chunk
      #print(" length =",type(out_message))
      c_publish(client,topic,out_message,qos)
         
   else:
      #send hash
      out_message=out_hash_md5.hexdigest()
      send_end(filename)
      #print("out Message ",out_message)
      res,mid=client.publish("data/files",out_message,qos=1)#publish
      Run_flag=False
time_taken=time.time()-start
print("took ",time_taken)
time.sleep(4)
client.disconnect() #disconnect
client.loop_stop() #stop loop
fout.close() #close files
fo.close()

When I run the script with a test file this is what I see:

send-file-mqtt-output

Code Notes:

  1. The process_message function handles the received messages and stores the data in a file.
  2. I use a number of flags attached to the client instance like:
    client.puback_flag
    client.mid_value

Code Improvements

You can send file details in the header section as JSON encoded data the following snippets will send the filename and on the receive extract the filename and use it.

If you need to include a lot of information you can always increase the header size.

On the sender you need to change the send_header and send_end functions.

def send_header(filename):

   file_data={"filename":filename}
   file_data_json=json.dumps(file_data)
   header="header"+",,"+file_data_json + ",,"
   header=bytearray(header,"utf-8")
   header.extend(b'x'*(200-len(header)))
   print(header)
   c_publish(client,topic,header,qos)
def send_end(filename):
   end="end"+",,"+filename+",,"+out_hash_md5.hexdigest()+",,"
   end=bytearray(end,"utf-8")
   end.extend(b'x'*(200-len(end)))
   print(end)
   c_publish(client,topic,end,qos)

On the receiver you need to change the on_message,process_message functions

def on_message(client, userdata, message):
   global fout
   #time.sleep(1)
   #print("received message =",str(message.payload.decode("utf-8")))
   if process_message(message.payload):
      fout.write(message.payload)
############
def extract_file_data(file_data):
   data=json.loads(file_data)
   filename=data["filename"]
   return filename
def process_message(msg):
   """ This is the main receiver code
   """
   global fout
   print("received ")
   if len(msg)==200: #is header or end
      msg_in=msg.decode("utf-8","ignore")
      msg_in=msg_in.split(",,")
      if msg_in[0]=="header": #header
         filename=extract_file_data(msg_in[1])
         file_out="copy-"+filename
         fout=open(file_out,"wb") #use a different filename

      if msg_in[0]=="end": #is it really last packet?
         in_hash_final=in_hash_md5.hexdigest()
         if in_hash_final==msg_in[2]:
            print("File copied OK -valid hash  ",in_hash_final)
         else:
            print("Bad file receive   ",in_hash_final)
         return False
      else:
         if msg_in[0]!="header":
            in_hash_md5.update(msg)
            return True
         else:
            return False
   else:
      in_hash_md5.update(msg)
      #msg_in=msg.decode("utf-8","ignore")
      if len(msg) <100:
         print(msg)
      return True

Code Download

Send File and receive file split and single send/receive file

download

Sending the entire File

In the example above we split the file into chunks which is necessary when sending over the Internet using third party brokers as they will probably have message size restrictions.

If you control the broker then you don’t need to do this and you can send the entire file in one go.

All you need to do is read in the file and place it into a MQTT payload. I do this for backing up databases on my system with files around 1MB and it works perfectly.

Comments and Feedback

Was there enough detail in this tutorial for you to follow?

Please help me improve these tutorials by leaving your comments,rating them,asking questions.

Related tutorials and resources:

 

Please rate? And use Comments to let me know more

47 comments

  1. Hey Steve,

    I find your code really interesting. But I dont know mqtt quite well so I have some problems understanding this code. I want to split it into a send and a receive script. But I really cant figure out what to put where. Would it be possible for you to attach a zip where you put the code into seperate send and receive sections so it can be actually used in real life deployment?

    Also I really like what you did because it is really hard to find good examples for sending files via mqtt, what in itself is a bit unorthodox but in my usecase I still need it 😀

  2. Hi Steve. I have a problem on my project which involve MQTT using a Raspberry Pi. In my project, Raspberry Pi is used as a microcontroller (meaning all the sensors used are connected directly to Raspi) and also as a broker which I have installed in my Raspi (Mosquitto Broker). I have tested the mosquitto broker using the pub/sub command in Raspi terminal and it work fine. I also use Node-red to make the user interface for my project. Here is where I’m stuck in. When I run my script about publishing and subscribing message there are errors like this

    File “coba_pub.py”, line 8, in client.connect(mqttBroker,1883) File “/usr/local/lib/python2.7/dist-packages/paho_mqtt-1.5.1-py2.7.egg/paho/mqtt/client.py”, line 941, in connect return self.reconnect() File “/usr/local/lib/python2.7/dist-packages/paho_mqtt-1.5.1-py2.7.egg/paho/mqtt/client.py”, line 1075, in reconnect sock = self._create_socket_connection() File “/usr/local/lib/python2.7/dist-packages/paho_mqtt-1.5.1-py2.7.egg/paho/mqtt/client.py”, line 3546, in _create_socket_connection return socket.create_connection(addr, source_address=source, timeout=self._keepalive) File “/usr/lib/python2.7/socket.py”, line 575, in create_connection raise err socket.timeout: timed out

    Here is my python script import paho.mqtt.client as mqtt from random import randrange, uniform import time mqttBroker=”mqtt.eclipse.org” client = mqtt.Client(“Temperature inside”) client.connect(mqttBroker,1883,60) while True: randNumber = uniform(20.0, 21.0) client.publish(“TEMPERATURE”, randNumber) print(“Just published ” + str(randNumber) + ” to topic TEMPERATURE”) time.sleep(1)

    Can you help me figure this out? Thanks

  3. Hi Steve, in receiving code there were sending code too, do we need to comment it out first? Is it same goes as the sending code? Because I do put the sending code in my laptop and the receiving code at my broker to send the image from laptop to the broker

    Because after a few attempt, the picture of “copy-picture” does appear in the broker but it cannot be open and there was socket error somehow but my laptop and the broker can ping together. I guess the code in receiving script redundant because contain send script? Do correct me if I’m wrong

    1. With the download there is the option of a sending only and receiving only script rather than the combined send/receive script

  4. Hi Steve! Great tutorial you have here. But I have a question regarding send and receive the data. I do implement the enhancement code and run send code on my laptop and receive code in my server. I want to send the image from my laptop to the server. When I run both codes the image does pop up at the server but after I run the recieve code on the server they did not connect because the picture came from the coding on the receiving script.

    And error that I get at the server is OSError say that the host cannot connect or socket problem. But I do ping the laptop and the server together and it does ping. Error at my laptop is not got puback so quitting. Can you help me? Thank you

    1. It sound like they cannot connect to the mqtt server. Do a simple test of mqtt using the mosquitto_pub/sub tools

  5. Hi Steve,
    I have a question regarding send and receive a file. Do we need to place the picture/specific files in any specific folder so when they want to be sent to other servers/client they can extract the picture there? I mean how can they know or where you place the picture so the picture can be sent? Is there any path we need to do or create?

    Thank you!

      1. I been searching for the enhancement part about the header but cannot find it. Could you attach the link so I can continue my perusal? It will do a big help for me!

        Thank You, Steve!

        1. This copied from the tutorial

          Code Improvements

          You can send file details in the header section as JSON encoded data the following snippets will send the filename and on the receive extract the filename and use it.

          If you need to include a lot of fil information you can always increase the header size.

          On the sender you need to change the send_header and send_end functions.

          def send_header(filename):

          file_data={“filename”:filename}
          file_data_json=json.dumps(file_data)
          header=”header”+”,,”+file_data_json + “,,”
          header=bytearray(header,”utf-8″)
          header.extend(b’x’*(200-len(header)))
          print(header)
          c_publish(client,topic,header,qos)
          def send_end(filename):
          end=”end”+”,,”+filename+”,,”+out_hash_md5.hexdigest()+”,,”
          end=bytearray(end,”utf-8″)
          end.extend(b’x’*(200-len(end)))
          print(end)
          c_publish(client,topic,end,qos)

          On the receiver you need to change the on_message,process_message functions

          def on_message(client, userdata, message):
          global fout
          #time.sleep(1)
          #print(“received message =”,str(message.payload.decode(“utf-8”)))
          if process_message(message.payload):
          fout.write(message.payload)
          ############
          def extract_file_data(file_data):
          data=json.loads(file_data)
          filename=data[“filename”]
          return filename
          def process_message(msg):
          “”” This is the main receiver code
          “””
          global fout
          print(“received “)
          if len(msg)==200: #is header or end
          msg_in=msg.decode(“utf-8″,”ignore”)
          msg_in=msg_in.split(“,,”)
          if msg_in[0]==”header”: #header
          filename=extract_file_data(msg_in[1])
          file_out=”copy-“+filename
          fout=open(file_out,”wb”) #use a different filename

          if msg_in[0]==”end”: #is it really last packet?
          in_hash_final=in_hash_md5.hexdigest()
          if in_hash_final==msg_in[2]:
          print(“File copied OK -valid hash “,in_hash_final)
          else:
          print(“Bad file receive “,in_hash_final)
          return False
          else:
          if msg_in[0]!=”header”:
          in_hash_md5.update(msg)
          return True
          else:
          return False
          else:
          in_hash_md5.update(msg)
          #msg_in=msg.decode(“utf-8″,”ignore”)
          if len(msg) <100:
          print(msg)
          return True

          1. The script defaults to the local folder you will need to adjust the filename if you place the picture file in a different location.

    1. Yes but you would need to have a mechanism to determine the file name.If you look at the linked article it uses JSON to do this.
      I prefer to keep it as binary so you would need to use a header that you can pick up that details the payload.
      rgds
      steve

  6. Hello Steve,
    In you implementation I could figure it out that you are subscribing to topic initially and later on just publishing the file info.
    But I am trying to establish two communication continuously, i.e every time I want to send a chunk I expect a subscription and want to receive data from client and then publish.
    Can you please guide me how to achieve this?
    Thank you in advance!

      1. Hello steve,
        This is also not exactly what i wanted.
        On this section the publish of data is in regular intervals of time. But i was expecting that once clientA publishes some data then clientB would subscribe and publish to clientB and then again clientA would subscribe and then publish….. A kind of feedback where the one will not publish unless the other client publishes somedata.

      2. Hello steve,
        This is also not exactly what i wanted.
        Here the publish is happening independent of other clients reponse.
        Like its happening Contiously.
        What i wanted was when client a publishes then clientB Subscibes and then publishes, this is like condional publsih and subscribe.
        A kind of feedback where the clients eill not publish one another until they revieve some message on subscription.

        1. Hi
          I don’t have an example of exactly what you want but you should be able to modify the code in the two way communication to do it.
          Rgds
          Steve

  7. Hi, I was trying to use your code to be able to send a bin file using MQTT to a microcontroller, STM32. I’m able to receive one message using the send script but not the rest. I feel like the script is sending the messages too quickly through MQTT broker, I was wondering how I could slow down the publish. When I add a time.sleep() it seems to mess up the code.

    1. yes you should be able to put a sleep statement in the send like below
      if chunk:
      out_hash_md5.update(chunk) #update hash
      out_message=chunk
      #print(” length =”,type(out_message))
      bytes_out=bytes_out+len(out_message)

      c_publish(client,topic,out_message,qos)
      time.sleep(1)

  8. Hi Steve,

    I have send the .png file through this code and it works.
    My question is: It is sending a file in byte array how to read that file(i mean a person who is a subscriber how can he see the image clearly) ?

    Thanks
    Ankur

  9. hi could you show me how to do the exact same thing here but in node red and a python function?and also how to call a node from a json formatted data in the python3 function and send it towards other nodes? i really appreciate your work. thanks steve!

    1. Hi
      Are you looking to use node-red to send/receive or a mixture of node-red and python?
      If python do you want to put the python code into the python3 node?
      An alternative is to use the exec node too run the python-code as a system command.
      Rgds
      Steve

    1. I will try but it will take some time as I have lots of other work to do at the moment
      rgds
      steve

  10. Hi,

    it seems to work, but it never receives an EOF to finish the file…
    Any clue, how to accomplish this?

    Thanks,

    desq

    desq@iohost:~$ python3 rec.py
    connecting to broker 10.0.1.3
    subscribing
    received
    found data bytes= 27529
    received
    found data bytes= 55058
    received
    found data bytes= 82587
    received
    found data bytes= 110116
    received
    found data bytes= 137645
    received
    found data bytes= 165174

  11. Hello Steve,
    Thank you for the Tutorial !!! Question I have is regarding data file that needs to be chucked based on limited payload size upload. I have MQTT APIs of data elements that would have samples at a 1 second sequence rate in the data log file…to chuck the data would it be recommended to calculate the number of upload of the chucking and have this in the header so that the backend knows it needs to reassemble the data?

    1. Hi
      I assume you mean chunked.
      The data is already split into chunks
      Lokk for
      data_block_size=2000
      and set to what you want.
      2000 =2000bytes
      rgds
      steve

  12. I am new to MQTT and Python.
    I am splitting your example up so that I am publishing on one machine and subscribing on another.
    What documentation should I look at that would point to how to parse out the filename on the receiving machine so that I end up with the same file name on both ends?

    Thank you

    1. Bill
      You need to include the filename as part of the header. The script already does it
      def send_header(filename):
      header=”header”+”,,”+filename+”,,”
      header=bytearray(header,”utf-8″)
      header.extend(b’,’*(200-len(header)))
      print(header)
      what It doesn’t do is extract the filename when it receives it. so you need to do that bit.
      Use the ask steve page if you are struggling with it and I’ll take a look at it.
      Rgds
      Steve

      1. As an alternative to sticking stuff in headers, you could firstly zip the file (so if it doesn’t unzip you know you’re not dealing with a complete file), then put the filename in the zip entry, or even in the topic name and subscribe to /x/y/# and send the message to /x/y/filename perhaps.

        1. To topic idea would be a good idea if there were lots of file downloads.The hash already checks for the complete file but zipping would probably make it quicker for large files.
          I did create a faster version that disn’t wait for the puback as it slows it down a lot.I don’t think it is quite finished but if anyone is interested in trying it I will email it just use the ask steve page.
          rgds
          Steve

  13. Hi Steve,
    When I try to run this code separately as subscriber.py and publish.py it is failing throwing error as “return from wait loop taken too long”
    Could you please tell what might be going wrong?
    I am just a beginner in MQTT and python
    Please help me out

    Thanks in Advance.

  14. I don’t detect network failures in this script.

    – Just wondering in case of network disconnect. but using clean session if the client reestablish the same session. can it download from the point were it left ? or the client will start over ?

    1. No when you reconnect you will have to start the download again.
      However you can implement the functionality in the script. Is it something that you need?

Leave a Reply

Your email address will not be published. Required fields are marked *