Storing Time Series data in Python Using TinyFlux db

Most data in IOT projects will be time series data. The traditional database for storing such data is influxdb.

However if you are looking for something simpler and lighter then you can always use SQLite.

SQLite isn’t optimised for time series data, but that usually isn’t a problem with small projects.

However there is another alternative that I have become aware of and that is TinyFlux which is based on the popular TinyDB database but optimised for time series data.

You can find out more about the relationship between TinyDB and TinyFlux in this medium article written by the author of TinFlux.

What is TinyFlux

TinyFlux is a pure-Python package containing database syntax designed for time series data.

MQTT messages are a natural fit for time series databases as the timestamp of the message is generally an important variable, and these messages are typically generated in realtime and often at a high frequency.

TinyFlux and other time series databases are designed for this type of data.  TinyFlux does not require a separate database server, so it is ideal for small, at-home IoT projects.

TinyFlux Basics

The TinyFlux documentation has a good introduction to working with concepts of a time series database, and with the syntax of TinyFlux.

In this tutorial we are primarily concerned with three things:

  1. Creating a database
  2. Inserting MQTT contents into the database
  3. Querying the database for existing MQTT messages

Creating a database

Creating a database is straight-forward.  Provide the file path for the new database to the TinyFlux constructor:

from tinyflux import TinyFlux

db = TinyFlux('my_new_database.db')

Inserting Data

Individual observations in a TinyFlux database are known as “points”.

Points consist of a timestamp, numeric key/value pairs known as “fields”, and textual key/value pairs known as “tags”.

Use the “insert” method to insert one point, or use the “insert_multiple” method to insert multiple points.

See the “Writing Data” page in the documentation for more information.

from datetime import datetime
from tinyflux import Pointp = Point(
measurement="air quality",
time=datetime.fromisoformat("2020-08-28T00:00:00-07:00"),
tags={"city": "LA"},
fields={"aqi": 112}
)db.insert(p)

Or

from datetime import datetime, timezone
from tinyflux import TinyFlux, Pointp1 = Point(
time=datetime(2021, 11, 7, 21, 0, tzinfo=timezone.utc),
tags={"name": "Bitcoin", "code": "BTC"},
fields={"price": 67582.6, "dominance": 0.298}
)p2 = Point(
time=datetime(2022, 7, 13, 21, 0, tzinfo=timezone.utc),
tags={"name": "Bitcoin", "code": "BTC"},
fields={"price": 19830.9, "dominance": 0.481}
)# Insert into the DB.
db.insert_multiple([p1, p2])

Querying Data

Querying in TinyFlux is similar to ORM-like database syntax of other popular Python database clients and wrappers.

Pass a simple or compound query to the ‘search’ method.  To query timestamps, use a TimeQuery.

To query fields, use a FieldQuery.  To query tags, use a TagQuery.

Combine queries with the ‘&’ (and) or ‘|’ (or) operator.  See the “Querying Data” page in the documentation for more information.

from tinyflux import TagQuery, TimeQuery

# Init new queries.
tags, time = TagQuery(), TimeQuery()
q1 = (tags.code == "BTC")
q2 = (time >= datetime(2020, 1, 1, tzinfo=timezone.utc))

# Pass the queries to TinyFlux's 'search' method.
my_results = db.search(q1 & q2)

Publishing Test Messages

You need an MQTT client to publish messages. The most common is mosquitto_pub but you will need to install mosquitto clients to obtain it.

Alternatives are MQTTBox and MQTTLens which are browser based and easier to use.

In this example, we are publishing messages to the ‘test.mosquitto.org’ host, with the topic of ‘tinyflux_test_topic’.

The message is a JSON encoded string with two key/value pairs, one textual and one numeric.  To publish this test message, you would type the following into a Linux terminal:

mosquitto_pub \
-h test.mosquitto.org \
-t tinyflux_test_topic \
-m "{\"device\":\"thermostat\",\"value\":70.0}"

Listening for Messages and Inserting into TinyFlux

This single script will run two threads in a single Python process.

The main thread will listen for MQTT messages on a specified host and topic and then place them in a shared inter-thread queue, while the secondary thread will continually pop from this queue and write to TinyFlux.

This logic sequence is laid out in the main() function:

def main():
"""Define main."""
# Log.
print(f"Connecting to {MQTT_HOST}... ", flush=True, end="")

A single thread is initialized by default.  To initialize the worker (secondary) thread, use the built-in threading module.

The work that is performed by the worker thread is defined in the ‘run_tinyflux_worker’ function.

This function checks the inter-thread queue, pops from the queue if necessary, forms a new TinyFlux point, and inserts the point into the database.

This function definition is passed to the ‘target’ argument of the new thread:

    # Initialize TinyFlux worker thread.
t = threading.Thread(target=run_tinyflux_worker)# Start the worker thread.
t.start()

We define the MQTT client and register the callbacks (“on_connect”, “on_message”) using a helper function, called “initialize_mqtt_client”.

After initializing, we start the main thread in which the client will listen for messages:

    # Initialise MQTT CLIENT.
client = initialize_mqtt_client(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE)# Start MQTT network loop in a threaded interface to unblock main thread.
client.loop_start()# Connect to the broker.
client.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE)

We run this process indefinitely until a SIGINT command is sent to the process (usually just a kill process command from the keyboard).

We need to clean up the multi-threaded process gracefully upon receiving this signal.

We do this by propagating an exit event to the worker thread, stopping the MQTT client loop, and then joining the worker thread with the main thread and closing the TinyFlux database.

    # Keep this process running until SIGINT received.
try:
while True:
pass
except KeyboardInterrupt:
print("\nExiting gracefully... ", flush=True, end="")# SIGINT received: set the exit event so the worker thread knows to exit.
exit_event.set()# Await spawned thread.
t.join()# Stop network loop.
client.loop_stop()# Close db.
db.close()

Notes: The major part of this tutorial was written by Justin Fung the Author of TinyFlux

Demo Script

download

Resources

Please rate? And use Comments to let me know more

One comment

  1. This tutorial is awesome! Thanks Steve for highlighting one of the use-cases of TinyFlux. If anyone needs any clarifications or assistance, feel free to use the GitHub discussions for the repository, at github.com/citrusvanilla/tinyflux/discussions.

    Happy coding!

Leave a Reply to Justin Fung Cancel reply

Your email address will not be published.