Storing MQTT Messages (Sensor Data) in a SQLite DataBase With Python

MQTT-SQL-Python-data-logger

In this project we will create a simple data logger to store MQTT messages in a sqlite database.

MQTT message Data can be JSON or simple text and is stored as is.

The project Consists of two modules.

  • A sql logger class module sql_logger.py
  • The logging script.

The script uses a main thread to get the messages(on_message callback) and a worker thread to store the data.

A queue is used to move the messages between threads.

Prerequisists

  • Python
  • sqlite3 -pip install sqlite

SQLITE Basics

Tutorials point have a good introduction to using sqlite with python.

If you are new to sql databases then I recommend these tutorials on SQLite here.

There are three basic things we need to do.

  • Create the database
  • Create the table to store the data.
  • Create the SQL query to write the data

Creating the Database

This is pretty straightforward as all we need to do is decide on a database name and location. You can create the database from within the script or externally using SQL commands.

Creating the Table

To do this you need to know the data you will store and the type. SQLite doesn’t have many data types and the main ones used in the script are text and Integer.

Data is arranged in columns which require a name and each data entry is a row.

Again you can create the table from the SQLite command line or from within the script.

DataBase Query

This is the most difficult part and I suggest that if you need to build your own query that you first test it using the SQLite command line before you code it in Python.

The basic structure is:

INSERT INTO TABLE_NAME (COLUMN NAMES) VALUES(COLUMN VALUES)

Sensor Data Characteristics

Because sensor data is often repetitive the script,by default, only logs (stores) changed data.

That means that if a sensor sends its status as “ON” once a second then it could result in 3600 “ON” messages logged every hour. The script will however only log 1 message.

You can override this using the -s option.

SQL Logger Class

The class is implemented in a module called sql_logger.py (sql logger).

It consists of 5 main methods

  • __init__ – initailise the class takes the database name
  • Log_sensor -logs the sensor data
  • Log_message – replaced by log_sensor
  • drop_table – drops a table
  • create_table – creates a table

To create an instance you need to supply a single parameter – the database file name:

logger=SQL_data_logger(db_file)

You then create a table to store the data, and optionally delete the old data by dropping the old table


logger.drop_table("logs")
logger.create_table("logs",table_fields) 

The database fields are the fields that you will be storing. In my case they are:

  • Time
  • Topic
  • Sensor name
  • Message

To store data in the database you use the Log_sensor method with two parameters as shown:

logger.Log_sensor(data_query,data_out)

The data_query contains the SQL statements that you want to execute and the data_out parameter contains a list of field values:

 data_query="INSERT INTO "+ Table_name \ +"(time,topic,sensor,message)VALUES(?,?,?,?)" 
data_out=[time,topic,sensor,message]

MQTT Data Logger

This script will store messages on a collection of topics. It stores:

  • Message time
  • Message topic
  • message

The on_message callback calls the message_handler function to process the message.

The message handler function calls the has_changed function to check if the message status is different from the last message.

If it is the same then the message isn’t stored as there is not point storing the same message value multiple times.

If it is different it is placed on the queue.

The worker takes the data from the queue and logs it to disk.

The relevant code is shown below.

def on_message(client,userdata, msg):
    topic=msg.topic
    m_decode=str(msg.payload.decode("utf-8","ignore"))
    message_handler(client,m_decode,topic)
    #print("message received")
def message_handler(client,msg,topic):
    data=dict()
    tnow=time.localtime(time.time())
    m=time.asctime(tnow)+" "+topic+" "+msg
    data["time"]=tnow
    data["topic"]=topic
    data["message"]=msg
    if has_changed(topic,msg):
        print("storing changed data",topic, "   ",msg)
        q.put(data) #put messages on queue
def has_changed(topic,msg):
    topic2=topic.lower()
    if topic2.find("control")!=-1:
        return False
    if topic in last_message:
        if last_message[topic]==msg:
            return False
    last_message[topic]=msg
    return True
def log_worker():
    logger=SQL_data_logger(db_file)
    logger.drop_table("logs")
    logger.create_table("logs",table_fields)
    """runs in own thread to log data"""
    while Log_worker_flag:
        while not q.empty():
            results = q.get()
            if results is None:
                continue
            log.log_json(results)
            #print("message saved ",results["message"])
    log.close_file()

The worker is started at the beginning of the script.

t = threading.Thread(target=log_worker) #start logger
Log_worker_flag=True
t.start() #start logging thread

The Log_worker_flag is used to stop the worker when the script terminates

Using the Data Logger

You need to provide the script with:

  • List of topics to monitor
  • broker name and port
  • user name and password if needed.
  • base log directory and number of logs have defaults

The script can also be run from the command line. Type

python mqtt-data-logger-sql.py -h

for a list of options.

Example Usage:

You will always need to specify the broker name or IP address and the topics to log

Note: You may not need to use the python prefix or may
need to use python3 mqtt-data-logger-sql.py (Linux)

Specify broker and topics

python mqtt-data-logger-sql.py -b 192.168.1.157 -t sensors/#

Specify broker and multiple topics

python mqtt-data-logger-sql.py -b 192.168.1.157 -t sensors/# -t home/#

Log All Data:

python mqtt-data-logger-sql.py b 192.168.1.157 -t sensors/# -s

Specify the client name used by the logger

python mqtt-data-logger-sql.py b 192.168.1.157 -t sensors/# -n data-logger.

Other Options

JSON data is stores as is and there is no data extraction. The screen shot below show sample stored data.

sglite-database-json

You can extract values from the data in the message handler function and store those but you will need to modify the table and insert commands accordingly.

In a similar fashion the topic in the example above also contains the sensor name and so this could be extracted from the topic and used.

Comments and Feedback

Was there enough detail in this tutorial for you to follow?

Please help me improve these tutorials by leaving your comments,rating them,asking questions.

Resources

I’m not a sqlite expert and my template for this script came from this Github script.

This article also helped with dealing with concurrency.

Really good collections of tutorials on sqlite here.

Download

download

Related Tutorials

Please rate? And use Comments to let me know more

15 comments

  1. Hi steve!
    Thank you for your whole website, there are such wonderful projects on there. It helps me a lot!
    I am using your python to sql scripts and managed to log all messages to a sqllite database. However, if I start it with “python3 mqtt-data-logger-sql.py -b broker.com -t /a/sensors -t /b/sensors &” on my rented vserver I get high cpu loads around 80%.
    I tried to implement a delay of 1 second:
    #loop and wait until interrupted
    try:
    while True:
    #print(“loop:-“)
    time.sleep(1)
    pass
    except KeyboardInterrupt:
    print(“interrrupted by keyboard”)

    But this does not change anything in the cpu load. Do you have an idea on how to fix it?
    Greetings!

      1. the problem is in the log worker you need to add a small delay and also change time to time stamp here is the new bit of code. I have marked changes with #############
        def log_worker():
        “””runs in own thread to log data”””
        #create logger
        logger=SQL_data_logger(db_file)
        logger.drop_table(“logs”)
        logger.create_table(“logs”,table_fields)
        while Log_worker_flag:
        time.sleep(0.01)##############
        while not q.empty():
        data = q.get()
        if data is None:
        continue
        try:
        timestamp=data[“time”]##############
        topic=data[“topic”]
        message=data[“message”]
        sensor=”Dummy-sensor”
        data_out=[timestamp,topic,sensor,message]###################
        data_query=”INSERT INTO “+ \
        Table_name +”(time,topic,sensor,message)VALUES(?,?,?,?)”
        logger.Log_sensor(data_query,data_out)
        except Exception as e:
        print(“problem with logging “,e)
        logger.conn.close()

        #print(“message saved “,results[“message”])

        ########################
        ####

        1. Hey,
          thanks for your quick reply. I tried your suggestion, but even with sleep at 1 sec I get a cpu load of 75%. And I see no change even wit 10 sec pause:
          while Log_worker_flag:
          time.sleep(10)
          print(“sleep_loop”)
          while not q.empty():
          data = q.get()
          Do you got any other idea?
          Greetings!

          1. Hi Steve,
            I cannot answer your comment, thats why I am replying here.
            Even with your file, unmodified I get an instant cpu load of 80%.
            Do you have any ideas?
            Greetings!

  2. Interesting article of Sqlite concurrency, but most of the time, u need to write into db all broker msg, coming from many device (oftenly at the same time), and usually, db locked comes…
    I don’t see how to handle this with simple SQlite db

    1. I Understand you point regarding concurrency and I’m not a SQL expert but in the case of the logger there is only one user and that is the logger so it is not an issue.
      I use SQLLite as it is easy and available on any system the code should also work with MYSQL if you want to go that route.
      Rgds
      Steve

  3. Steve,

    I have seen code like msg.payload.decode() and msg.payload.translate() but I have not been able to find a good explanation of their use. Can you point me at a suitable resource?

    Jay.

    1. You have to remember that msg.payload is in binary. the decode translates the binary string into characters. In early days this was easy as ascii was standards and so 1 byte=1 character.
      Take a look at this
      http://www.steves-internet-guide.com/guide-data-character-encoding/
      Obviously you only need to use the decode if you know the msg.payload is a character string. If it was an mp3 file then you don’t need the decode as you keep it in binary.
      This has more detail on decode
      https://www.tutorialspoint.com/python/string_decode.htm
      I have used the translate but again here is a description
      https://www.tutorialspoint.com/python/string_translate.htm
      hope that helps
      Rgds
      Steve

  4. Some more questions:

    In the code I can see:
    logger=SQL_data_logger(db_file)
    logger.drop_table(“logs”)
    logger.create_table(“logs”,table_fields)

    And I also see:
    db_file=”logs.db”
    Table_name=”logs”

    does it mean the database will be created in the same directory where the python scripts are located?

  5. How to start?
    I never used Python before but I wan’t to use this part to read MQTT data into a database. In the next step I will import from there into SQL Server.
    I installed Python on my Windows 10, but when I call the script I get:

    Traceback (most recent call last):
    File “D:\Git_Repositories\Stromzaehler\MQTT\sql-logger\sql-logger\mqtt-data-logger-sql.py”, line 9, in
    import paho.mqtt.client as mqtt
    ModuleNotFoundError: No module named ‘paho’

    1. I found somethinge here:
      https://pypi.org/project/paho-mqtt/
      but also this will not solve the issue:
      pip install paho-mqtt
      it will be installed. When I try to install this again, I get the message:
      Requirement already satisfied: paho-mqtt in c:\users\ggoer\appdata\local\packages\pythonsoftwarefoundation.python.3.7_qbz5n2kfra8p0\localcache\local-packages\python37\site-packages (1.4.0)

      But the error is still the same. And I understand, that this not a specific issue of the script. The syntax with “import” looks like in the example on https://pypi.org/project/paho-mqtt/

Leave a Reply to Germo Görtz Cancel reply

Your email address will not be published. Required fields are marked *