How to Log MQTT Sensor Data by Topic Using Python

mqtt-topic-loggerThe MQTT sensor data logger logs data on all monitored topics to the same log files.

If the data need to be split into topics then it will need to be done by the log analyser software.

However the topics logger logs sensor/device data based on topic so that each topic has it’s own log file as shown in the diagram below:

MQTT-Topic-Logger File-Folder-Structure

Note: Node-red version here

Topic Logger Class

This class is similar to the mlogger class but automatically creates the folder hierarchy to match the topic hierarchy.

Data is stored in log files starting with log000.txt. When the log reaches the maximum size which is set at 5MB by default then the log file is rotated.

MQTT Data Logger Code Overview

The topic logger uses the tlogger class.

This script will log data on a collections of topics and by default only if changed. It logs

  • Message time
  • Message topic
  • message

Data is logged as a json string. The following lines are added to the on_message callback or a function called by the on_message callback..


    data["time"]=tnow
    data["topic"]=topic
    data["message"]=msg
    #log.log_json(data) #log in JSON format

You can log the data direct as shown above, but I’ve commented it out.

Instead I prefer to use a separate thread to log the data and so I place the data in a queue using the Queue class.

The message handler function calls the has_changed function to check if the message status is different from the last message.

If it is the same then the message isn’t stored (default mode) as there is no point storing the same message value multiple times.

This can be changed using the -s command line switch.

In addition the message handler function checks if the msg is already JSON encoded.

If it is it converts it back to a Javascript object other wise it will be JSON encoded twice.

The worker takes the data from the queue and logs it to disk using the log_json method, which converts the data to a JSON encoded string before storing.

The relevant code is shown below.

def on_message(client,userdata, msg):
    topic=msg.topic
    m_decode=str(msg.payload.decode("utf-8","ignore"))
    message_handler(client,m_decode,topic)
    #print("message received")
def message_handler(client,msg,topic):
    data=dict()
    tnow=time.localtime(time.time())
    try:
        msg=json.loads(msg)#convert to Javascript before saving
        #print("json data")
    except:
        pass
        #print("not already json")
    data["time"]=tnow
    data["topic"]=topic
    data["message"]=msg
    if command.options["storechangesonly"]:
        if has_changed(client,topic,msg):
            client.q.put(data) #put messages on queue
    else:
        client.q.put(data) #put messages on queue

def has_changed(topic,msg):
    topic2=topic.lower()
    if topic2.find("control")!=-1:
        return False
    if topic in last_message:
        if last_message[topic]==msg:
            return False
    last_message[topic]=msg
    return True
def log_worker():
    """runs in own thread to log data"""
    while Log_worker_flag:
        while not q.empty():
            results = q.get()
            if results is None:
                continue
            log.log_json(results)
            #print("message saved ",results["message"])
    log.close_file()

The worker is started at the beginning of the script.

t = threading.Thread(target=log_worker) #start logger
Log_worker_flag=True
t.start() #start logging thread

The Log_worker_flag is used to stop the worker when the script terminates.

Logging as csv

The logger will now log data in csv format instead of JSON.

The csv file will contain a header file which the logger creates from the incoming data.

In addition to ensure the header file is in a predetermined order an optional header file can be used as a template.

Full details are in the read_me file in the download package.

 

Using the Topic Logger

Note: By default the logger will only log changed data.

For example a door sensor publishing it’s status every 10 seconds will send the same message repeatedly until someone opens or closes it.

You need to provide the script with:

  • List of topics to monitor
  • broker name and port
  • username and password if needed.

The script is run from the command line. Type

python3 mqtt-data-logger.py -h #Linux
or
C:\python34\python.exe mqtt-data-logger.py -h #Windows

for a list of options.

Example Usage:

You will always need to specify the broker name or IP address and the topics to log

Note: You may not need to use the python prefix or may need to use python3 mqtt_topic_logger.py (Linux)

Specify broker and topics

python mqtt_data_logger.py -b 192.168.1.157 -t sensors/#

Specify broker and multiple topics

python mqtt_topic_logger.py -b 192.168.1.157 -t sensors/# -t home/#

Log All Data:

python mqtt_topic_logger.py b 192.168.1.157 -t sensors/# -s

Log as csv:

python mqtt_topic_logger.py b 192.168.1.157 -t sensors/# -c

Specify the client name used by the logger

python mqtt_topic_logger.py b 192.168.1.157 -t sensors/# -n data-logger

Specify the log directory

python mqtt_topic_logger.py b 192.168.1.157 -t sensors/# -l mylogs

Comments and Feedback

Was there enough detail in this tutorial for you to follow?

Please help me improve these tutorials by leaving your comments,rating them,asking questions.

Download

download

If you are using the logger in your projects then let me know and I can feature them on this page.

Testing Notes

I have tested this logger on a raspberry pi as the data logger with a message stream of 40,000 messages each message approximately 60bytes in length.

When logging changes I achieved a message rate of  76 messages/second.

When logging all messages I achieved a message rate of  50 messages/second.

Note: The topic logger has been containerised in Docker see  here

Related Tutorials and Resources

Please rate? And use Comments to let me know more

26 comments

  1. Hi Steves,
    Thank you for your work, I have a problem, I would be happy if you can help me,
    There is no problem saving csv without working, but I have problems with json format;
    C:\MQTT_Record\mqtt-topic-logger-master>python mqtt-topic-logger.py -h 192.168.0.89 -t sensor/# -s
    logging level WARNING
    Logging JSON format
    starting storing all data
    Max log size = 10000
    Log Directory = mlogs
    connecting to broker 192.168.0.89
    Connected and subscribed to [(‘sensor/#’, 0)]
    Exception in thread Thread-1 (log_worker):
    Traceback (most recent call last):
    File “C:\Python312\Lib\threading.py”, line 1073, in _bootstrap_inner
    self.run()
    File “C:\Python312\Lib\threading.py”, line 1010, in run
    self._target(*self._args, **self._kwargs)
    File “C:\MQTT_Record\mqtt-topic-logger-master\mqtt-topic-logger.py”, line 191, in log_worker
    log.log_json(results)
    File “C:\MQTT_Record\mqtt-topic-logger-master\tlogger.py”, line 83, in log_json
    self.log_data(jdata,topic)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    File “C:\MQTT_Record\mqtt-topic-logger-master\tlogger.py”, line 122, in log_data
    fo,writer=self.create_log_file(dir,topic,columns,fo=””,count=0)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^
    File “C:\MQTT_Record\mqtt-topic-logger-master\tlogger.py”, line 71, in create_log_file

    1. Do you have some sample data of what you are logging. Is it possible to send the raw data to test.mosquitto.org on topic sig/test/logging
      Rgds
      Steve

      1. Hi again Steve,
        I tried different solution methods on the problem, but I solved it in a different way. Python 3.12 was currently installed. When I had similar problems before, I was installing and testing old versions. I did that again, it worked fine with version 3.7. Of course, I will research a few points and work on its smooth functionality in subsequent versions. If I find a solution, I will share it here. Thank you very much again for your response and work.

  2. Hi,
    Thanks for all your efforts to help de-mystify MQTT.
    I’m using the Topic Logger script on a raspberry pi (python3.9.2) to store JSON, and I have two questions.
    1) If the script is restarted, the existing data in log000.txt is wiped. How can I get it to append to the existing data or increment the older data filename? There’s an integer “number_logs” (-L) but I can’t see that it’s used anywhere?
    2) I believe that the JSON spec. states there should be a comma between values in an array. Where would the best way to add this into your script?

    Thanks again.

      1. I took a look at the script and because of the design it is not an easy task as far as I can see.
        All topics have their own counter.
        You would need to save this info as meta data and read it back in to start and then use the counters.
        You can’t save it as Json as it contains a file pointer.
        Afraid I don’t really have the time to devote to changing it but if you do then please let me know.
        Rgds
        Steve

        1. Sadly, my knowledge of python is not nearly enough to be able to handle such a change.
          For now, I will use the simple data logger, running multiple times for logging the necessary topics.
          Hopefully, there won’t be much additional overhead.
          Thanks for looking.

          Martyn.

  3. Hi, wow impressed great tool you did. Although it doesn’t work perfectly for me – nested json from two sensors attached to one tasmota. Csv is lost totally. But Json format could be I hope easier to work on. I’m absolutely dumb when comes to python, but, what would you think about to add comma after every message logged in. I know I could edit it in notepad of some kind – but you know when hundreds of records comes to play – I’m just humble asking: add the comma, please – then I can wrap myself the whole thing in [ ] and there you go proper json 😉 Excel is happy, me happy, we’ve got the sunshine again,
    Second – the csv thing I was thinking to replace simple all : { } ‘ ” with simple comma as well like csv-simplified I didn’t try it yet though, you know, headers are not the priority – load it to other software is an issue g-sheets or excel to analyse the data. if you don’t have a time don’t bother this is already great tool, appreciate your time and effort

  4. I appreciate the templates and how to guidance. Having some tests both in Windows and Linux, I found a couple tweaks I needed to make for bluetooth feedback as the topic device mac address wasn’t compatible with Windows Folder naming. Needed to remove colons
    in tlogger.py > create_log_dir add log_dir = log_dir.replace(“:”,””)
    in tlogger.py > create_log_file add dir = dir.replace(“:”,””)

  5. Hi Steve,
    I really like the (described) functionality of “mqtt-topic-logger” and plan to use it for my smarthome components.
    Unfortunatelly I obviously have no clue how to install it….
    I use LINUX @OPENHABIAN , a Raspberry lite based distribution as far as I understood.
    Python works well an I installed the paho-mqtt client successfully. But the script needs to have the “queue” modul and can’t find it.
    Same situation at Windows 10 – this was my first try.
    Question: How Can I make it run on a Pi ? If needed, I add another one!
    My background is EE/SE, but now I’m 60+ and everythings works a little slower …
    Any help would be greatly appreciated!
    Best greatings from Berlin – and stay healthy!

    1. Hi
      It is probably a python version problem and you are picking up python2. Try python –version and see what you get.
      Try running using python3 script and see if it works. If so edit the shebang line. Yhere are some notes on Python versions on the site.
      http://www.steves-internet-guide.com/python-notes/
      Also you may find this useful
      https://stackoverflow.com/questions/29687837/queue-importerror-in-python-3
      Let me know how you get on. I know what you mean by working a little slower.
      Rgds
      Steve

      1. Hi,
        thanks a lot for your quick aswer. Please allow one more question…
        What a wanted to have is CSV code data in folder strcutures according the topic.
        I can get folder structures, but just with JSAN coded data.
        I had a look in the source (not used to read Python before…), and CSV logging seemes not to be supported yet.
        1) Is this true?
        2) Is there achance for a CSV version in next future?
        3) Would a cup of coffee help ? 😉
        Kind regards,
        Stefan
        Kind regards ,

          1. Hi,
            you are totally right. What do you think about a totally different approach:
            Use the file creating function of TOPIC-Logger and just take the JSON parsing out and write the topic as plain text into the appropriate file.
            So it’s dependant on the item if it will be CSV or just text.
            For me it would be totally sufficient and I was just thinking about give it a try by myself 🙂
            Should not taking longer than a cup of coffee 😉
            And if you decide to control it by command line your tool is done: JSON and TXT logs…
            Regards,
            Stefan

          2. Hi
            I’ve sent you an email with the modified logger. Let me know how you get on.
            Rgds
            Steve

    2. Hi again,
      installation is solved – but still there are some issues.
      For anybody, who has same low level of knowledge like me:
      I needed to install PIP3 on OPENHABIAN and then “sudo pip3 install paho-mqtt”
      Connection with Mosquito in OPENHABIAN with user/password failed. Without authentification it works.
      To test you can use e.g. “broker.mqttdashboard.com” as broker.
      I still fight with authentification and my own topics, which are not being translated as expected.
      So far – so good and sorry for annoying you 🙂
      Stefan

  6. Hi Steve,

    Thanks for your contribution.I am tring to get Paho-MQTT message data from subscribe topic and write to a file.

    import time
    import sys
    import os
    from functools import partial

    import paho.mqtt.client as mqttClient
    # To convert *.txt to *.csv for later use.
    # import pandas as pd

    sys.path.append(os.path.join(os.path.dirname(__file__),’../’))
    from src.node_utils import create_node_logger, connect_to_broker
    from src import topics

    class LoggingNode():
    “””# use to log the details of this two topics
    # state_encoders = root + ‘/’ + ‘state_encoders’
    # format
    # Instance of State class
    # estimates the robots position based on encoder ticks

    # state_model = root + ‘/’ + ‘state_model’
    # format
    # Instance of State class for where the motion model estimates the
    # robot will be based on wheel_velocity
    #——————————————————————#
    #——————————————————————# “””
    def __init__(self, client, logger):
    self.state = None
    self.logger = logger
    self.client = client

    # initialize subscription to encoders and wheel_velocity
    def on_connect(self, userdata, flags, rc):
    suc1 = self.client.subscribe(topics.state_encoders)
    suc2 = self.client.subscribe(topics.state_model)
    self.logger.info(‘Connected and Subscribed: {} {}’.format(suc1, suc2))

    def on_message(self, client, userdata, message):
    print(‘heelo’)
    print(“Message received: ” + message.payload)
    with open(‘/home/test.txt’,’a+’) as f:
    f.write(“Message received: ” + message.payload + “\n”)

    Connected = False #global variable for the state of the connection

    if __name__ == ‘__main__’:
    node_name = ‘logging_node’
    logger = create_node_logger(node_name)
    client = connect_to_broker()

    node = LoggingNode(client, logger)

    client.on_connect = node.on_connect
    client.message_callback_add(topics.state_model, node.on_message)
    logger.info(‘Started logging Node loop’)

    try:
    client.loop_forever()
    except KeyboardInterrupt:
    logger.info(“KeyboardInterrupt seen”)

    This is not connecting to logger. Any suggestions ?

  7. Hi Steve….

    Tried your “MQTT_Topic_Logger” for a project I’m starting, perfect for what I needed.
    However the resource was continuously 100%. Is this normal?

    Regards
    Sean

  8. Thanks for the article

    I plan to use this code in a project that I am undertaking. I am collecting data from the various components of a solar Power system, temperature sensors, and a WiFi home thermostat. The design is such that each monitored device will publish it’s data to MQTT. Your code is well suited for collecting and logging any and all payloads from publishers instead of building multiple, separate loggers. Your code for filtering duplicates and time stamping has me rethinking doing that in each of my subsystems and just deferring it to this logger.

    I’ll probably extend it to post payloads to InfluxDB at the same time so that timestamps and duplicate control is synchronized.

    Keep up the good articles. Thanks again.

    1. Glad you find it useful. I did also do a sql one which you might find useful when you expand it to influxdb. There is a link and tutorial on the site.

  9. How can I log multiple clients?
    Say I have node1 and node2 sending the same data, but I want to make the log data say “client”:”espxxxxx”

    1. Hi
      You can’t as the logger doesn’t know the client id as that is the way mqtt works. The only place it could be done is on the broker.
      However MQTT v5 has a reply topic feature and others that might make it possible in the future but it would also require the sender to utilise them.
      Rgds
      Steve

Leave a Reply to steve Cancel reply

Your email address will not be published. Required fields are marked *