How to Log MQTT Sensor Data by Topic Using Python

mqtt-topic-loggerThe MQTT sensor data logger logs data on all monitored topics to the same log files.

If the data need to be split into topics then it will need to be done by the log analyser software.



However the topics logger logs sensor/device data based on topic so that each topic has it’s own log file as shown in the diagram below:

MQTT-Topic-Logger File-Folder-Structure

Note: Node-red version here

Topic Logger Class

This class is similar to the mlogger class but automatically creates the folder hierarchy to match the topic hierarchy.

Data is stored in log files starting with log000.txt. When the log reaches the maximum size which is set at 5MB by default then the log file is rotated.

MQTT Data Logger Code Overview

The topic logger uses the tlogger class.

This script will log data on a collections of topics and by default only if changed. It logs

  • Message time
  • Message topic
  • message

Data is logged as a json string. The following lines are added to the on_message callback or a function called by the on_message callback..


    data["time"]=tnow
    data["topic"]=topic
    data["message"]=msg
    #log.log_json(data) #log in JSON format

You can log the data direct as shown above, but I’ve commented it out.

Instead I prefer to use a separate thread to log the data and so I place the data in a queue using the Queue class.

The message handler function calls the has_changed function to check if the message status is different from the last message.

If it is the same then the message isn’t stored (default mode) as there is no point storing the same message value multiple times.

This can be changed using the -s command line switch.

In addition the message handler function checks if the msg is already JSON encoded.

If it is it converts it back to a Javascript object other wise it will be JSON encoded twice.

The worker takes the data from the queue and logs it to disk using the log_json method, which converts the data to a JSON encoded string before storing.

The relevant code is shown below.

def on_message(client,userdata, msg):
    topic=msg.topic
    m_decode=str(msg.payload.decode("utf-8","ignore"))
    message_handler(client,m_decode,topic)
    #print("message received")
def message_handler(client,msg,topic):
    data=dict()
    tnow=time.localtime(time.time())
    try:
        msg=json.loads(msg)#convert to Javascript before saving
        #print("json data")
    except:
        pass
        #print("not already json")
    data["time"]=tnow
    data["topic"]=topic
    data["message"]=msg
    if command.options["storechangesonly"]:
        if has_changed(client,topic,msg):
            client.q.put(data) #put messages on queue
    else:
        client.q.put(data) #put messages on queue

def has_changed(topic,msg):
    topic2=topic.lower()
    if topic2.find("control")!=-1:
        return False
    if topic in last_message:
        if last_message[topic]==msg:
            return False
    last_message[topic]=msg
    return True
def log_worker():
    """runs in own thread to log data"""
    while Log_worker_flag:
        while not q.empty():
            results = q.get()
            if results is None:
                continue
            log.log_json(results)
            #print("message saved ",results["message"])
    log.close_file()

The worker is started at the beginning of the script.

t = threading.Thread(target=log_worker) #start logger
Log_worker_flag=True
t.start() #start logging thread

The Log_worker_flag is used to stop the worker when the script terminates.

Using the Topic Logger

Note: By default the logger will only log changed data.

For example a door sensor publishing it’s status every 10 seconds will send the same message repeatedly until someone opens or closes it.

You need to provide the script with:

  • List of topics to monitor
  • broker name and port
  • username and password if needed.

The script is run from the command line. Type

python3 mqtt-data-logger.py -h #Linux
or
C:\python34\python.exe mqtt-data-logger.py -h #Windows

for a list of options.

Example Usage:

You will always need to specify the broker name or IP address and the topics to log

Note: You may not need to use the python prefix or may need to use python3 mqtt_topic_logger.py (Linux)

Specify broker and topics

python mqtt_data_logger.py -b 192.168.1.157 -t sensors/#

Specify broker and multiple topics

python mqtt_topic_logger.py -b 192.168.1.157 -t sensors/# -t home/#

Log All Data:

python mqtt_topic_logger.py b 192.168.1.157 -t sensors/# -s

Specify the client name used by the logger

python mqtt_topic_logger.py b 192.168.1.157 -t sensors/# -n data-logger

Specify the log directory

python mqtt_topic_logger.py b 192.168.1.157 -t sensors/# -l mylogs

Comments and Feedback

Was there enough detail in this tutorial for you to follow?

Please help me improve these tutorials by leaving your comments,rating them,asking questions.

Make a Contribution

pay-pal-buy-nowIf you find this script useful and would like to support its development then please consider making a small contribution by clicking here.

Download

If you are using the logger in your projects then let me know and I can feature them on this page.

Testing Notes

I have tested this logger on a raspberry pi as the data logger with a message stream of 40,000 messages each message approximately 60bytes in length.

When logging changes I achieved a message rate of  76 messages/second.

When logging all messages I achieved a message rate of  50 messages/second.

Note: The topic logger has been containerised in Docker see  here

Related Tutorials and Resources

Please rate? And use Comments to let me know more
[Total: 3   Average: 5/5]

16 comments

  1. Hi Steve,
    I really like the (described) functionality of “mqtt-topic-logger” and plan to use it for my smarthome components.
    Unfortunatelly I obviously have no clue how to install it….
    I use LINUX @OPENHABIAN , a Raspberry lite based distribution as far as I understood.
    Python works well an I installed the paho-mqtt client successfully. But the script needs to have the “queue” modul and can’t find it.
    Same situation at Windows 10 – this was my first try.
    Question: How Can I make it run on a Pi ? If needed, I add another one!
    My background is EE/SE, but now I’m 60+ and everythings works a little slower …
    Any help would be greatly appreciated!
    Best greatings from Berlin – and stay healthy!

    1. Hi
      It is probably a python version problem and you are picking up python2. Try python –version and see what you get.
      Try running using python3 script and see if it works. If so edit the shebang line. Yhere are some notes on Python versions on the site.
      http://www.steves-internet-guide.com/python-notes/
      Also you may find this useful
      https://stackoverflow.com/questions/29687837/queue-importerror-in-python-3
      Let me know how you get on. I know what you mean by working a little slower.
      Rgds
      Steve

      1. Hi,
        thanks a lot for your quick aswer. Please allow one more question…
        What a wanted to have is CSV code data in folder strcutures according the topic.
        I can get folder structures, but just with JSAN coded data.
        I had a look in the source (not used to read Python before…), and CSV logging seemes not to be supported yet.
        1) Is this true?
        2) Is there achance for a CSV version in next future?
        3) Would a cup of coffee help ? 😉
        Kind regards,
        Stefan
        Kind regards ,

          1. Hi,
            you are totally right. What do you think about a totally different approach:
            Use the file creating function of TOPIC-Logger and just take the JSON parsing out and write the topic as plain text into the appropriate file.
            So it’s dependant on the item if it will be CSV or just text.
            For me it would be totally sufficient and I was just thinking about give it a try by myself 🙂
            Should not taking longer than a cup of coffee 😉
            And if you decide to control it by command line your tool is done: JSON and TXT logs…
            Regards,
            Stefan

    2. Hi again,
      installation is solved – but still there are some issues.
      For anybody, who has same low level of knowledge like me:
      I needed to install PIP3 on OPENHABIAN and then “sudo pip3 install paho-mqtt”
      Connection with Mosquito in OPENHABIAN with user/password failed. Without authentification it works.
      To test you can use e.g. “broker.mqttdashboard.com” as broker.
      I still fight with authentification and my own topics, which are not being translated as expected.
      So far – so good and sorry for annoying you 🙂
      Stefan

  2. Hi Steve,

    Thanks for your contribution.I am tring to get Paho-MQTT message data from subscribe topic and write to a file.

    import time
    import sys
    import os
    from functools import partial

    import paho.mqtt.client as mqttClient
    # To convert *.txt to *.csv for later use.
    # import pandas as pd

    sys.path.append(os.path.join(os.path.dirname(__file__),’../’))
    from src.node_utils import create_node_logger, connect_to_broker
    from src import topics

    class LoggingNode():
    “””# use to log the details of this two topics
    # state_encoders = root + ‘/’ + ‘state_encoders’
    # format
    # Instance of State class
    # estimates the robots position based on encoder ticks

    # state_model = root + ‘/’ + ‘state_model’
    # format
    # Instance of State class for where the motion model estimates the
    # robot will be based on wheel_velocity
    #——————————————————————#
    #——————————————————————# “””
    def __init__(self, client, logger):
    self.state = None
    self.logger = logger
    self.client = client

    # initialize subscription to encoders and wheel_velocity
    def on_connect(self, userdata, flags, rc):
    suc1 = self.client.subscribe(topics.state_encoders)
    suc2 = self.client.subscribe(topics.state_model)
    self.logger.info(‘Connected and Subscribed: {} {}’.format(suc1, suc2))

    def on_message(self, client, userdata, message):
    print(‘heelo’)
    print(“Message received: ” + message.payload)
    with open(‘/home/test.txt’,’a+’) as f:
    f.write(“Message received: ” + message.payload + “\n”)

    Connected = False #global variable for the state of the connection

    if __name__ == ‘__main__’:
    node_name = ‘logging_node’
    logger = create_node_logger(node_name)
    client = connect_to_broker()

    node = LoggingNode(client, logger)

    client.on_connect = node.on_connect
    client.message_callback_add(topics.state_model, node.on_message)
    logger.info(‘Started logging Node loop’)

    try:
    client.loop_forever()
    except KeyboardInterrupt:
    logger.info(“KeyboardInterrupt seen”)

    This is not connecting to logger. Any suggestions ?

  3. Hi Steve….

    Tried your “MQTT_Topic_Logger” for a project I’m starting, perfect for what I needed.
    However the resource was continuously 100%. Is this normal?

    Regards
    Sean

  4. Thanks for the article

    I plan to use this code in a project that I am undertaking. I am collecting data from the various components of a solar Power system, temperature sensors, and a WiFi home thermostat. The design is such that each monitored device will publish it’s data to MQTT. Your code is well suited for collecting and logging any and all payloads from publishers instead of building multiple, separate loggers. Your code for filtering duplicates and time stamping has me rethinking doing that in each of my subsystems and just deferring it to this logger.

    I’ll probably extend it to post payloads to InfluxDB at the same time so that timestamps and duplicate control is synchronized.

    Keep up the good articles. Thanks again.

    1. Glad you find it useful. I did also do a sql one which you might find useful when you expand it to influxdb. There is a link and tutorial on the site.

  5. How can I log multiple clients?
    Say I have node1 and node2 sending the same data, but I want to make the log data say “client”:”espxxxxx”

    1. Hi
      You can’t as the logger doesn’t know the client id as that is the way mqtt works. The only place it could be done is on the broker.
      However MQTT v5 has a reply topic feature and others that might make it possible in the future but it would also require the sender to utilise them.
      Rgds
      Steve

Leave a Reply

Your email address will not be published. Required fields are marked *