In this article, we are going to present and demonstrate a new feature delivered in Debezium 2.4 - the integration with the TimescaleDB database.

TimescaleDB is an open-source database designed to make SQL scalable for time-series data. It is implemented as an extension for the PostgreSQL database. This fact leads us to re-use the standard Debezium PostgreSQL connector and implement TimescaleDB support as a single message transform (SMT).

TimescaleDB provides three basic building blocks/concepts:

  • Hypertables

  • Continuous aggregates

  • Compression

Metadata (catalog) that describes the definitions of the instances and the raw data are typically stored in _timescaledb_internal_schema. TimescaleDb SMT connects to the database and reads and processes the metadata. The raw messages read from the database are then enriched with the metadata stored in Kafka Connect headers, creating the relation between the physical data and the TimescaleDB logical constructs.

Demonstration

Debezium examples repository contains a Docker Compose-based deployment that provides a full environment to demonstrate the TimescaleDB integration.

The first step is to start the deployment

$ docker-compose -f docker-compose-timescaledb.yaml up --build

The command will bring up Debezium (Zookeeper, Kafka, Kafka Connect) and the source TimescaleDB database.

The started database is primed with the following database objects:

  • Hypertable conditions representing temperature and humidity measurements as time-series data; created with DDL CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL); SELECT create_hypertable('conditions', 'time')

  • A single record of the measurement data (INSERT INTO conditions VALUES(NOW(), 'Prague', 22.8, 53.3))

  • PostgreSQL publication used to publish time-series data into replication slot as the demo uses pgoutput decoding plugin (CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update'))

In the next step it is necessary to register the Debezium PostgreSQL connector to capture the changes in the database

$ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-timescaledb.yaml

The registration request file differs from the regular one with the addition of these lines

{
    "name": "inventory-connector",
    "config": {
...
        "schema.include.list": "_timescaledb_internal",
        "transforms": "timescaledb",
        "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb",
        "transforms.timescaledb.database.hostname": "timescaledb",
        "transforms.timescaledb.database.port": "5432",
        "transforms.timescaledb.database.user": "postgres",
        "transforms.timescaledb.database.password": "postgres",
        "transforms.timescaledb.database.dbname": "postgres"
    }
}

Hypertables

The connector will capture the internal TimescaleDB schema with the physical tables containing the raw data and the TimescaleDb SMT will be applied to enrich messages and route them to the correctly named topics based on the logical names. The SMT configuration options contain information needed to connect to the database. In this case, the conditions hypertable will be physically stored in _timescaledb_internal._hyper_1_1_chunk and when processed by the SMT, it will be re-routed to timescaledb.public.conditions topic that is named according to fixed configured prefix timescaledb and logical name public.conditions that conforms to the hypertable name.

Let’s add a few more measurements to the table

$ docker-compose -f docker-compose-timescaledb.yaml exec timescaledb env PGOPTIONS="--search_path=public" bash -c 'psql -U $POSTGRES_USER postgres'
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 30, 50);
postgres=# INSERT INTO conditions VALUES (now(), 'Brno', 35, 55);
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 40, 60);

and read the captured messages for the topic (printing of key and headers is enabled in the command)

docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --property print.headers=true \
    --topic timescaledb.public.conditions

The messages contain two headers debezium_timescaledb_chunk_table:_hyper_1_1_chunk,debezium_timescaledb_chunk_schema:_timescaledb_internal that describe the mapping between the logical hypertable name and the physical source table from which they were captured.

Continuous aggregates

Continuous aggregates provide automatic statistical calculations over data that is stored in hypertables. The aggregate is defined as a materialized view that is backed by its own hypertable which in turn is backed by a set of physical tables. After an aggregate is recalculated (either manually or automatically), the new values are stored in the hypertable, from which they can be captured and streamed. The connector captures the new values in the physical tables and the SMT again solves the routing by remapping the physical destination back into the aggregate logical name. Kafka Connect headers with original hypertable and physical table names are added too.

Let’s create a continuous aggregate named conditions_summary that calculates the average, minimum, and maximum temperature per location and time interval

postgres=# CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
  SELECT
    location,
    time_bucket(INTERVAL '1 hour', time) AS bucket,
    AVG(temperature),
    MAX(temperature),
    MIN(temperature)
  FROM conditions
  GROUP BY location, bucket;

and read the captured messages for the topic

docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --property print.headers=true \
    --topic timescaledb.public.conditions_summary

The messages contain two headers debezium_timescaledb_hypertable_table:_materialized_hypertable_2,debezium_timescaledb_hypertable_schema:_timescaledb_internal that expose which backing hypertable was used to store the aggregates and two addtional headers debezium_timescaledb_chunk_table:_hyper_2_2_chunk,debezium_timescaledb_chunk_schema:_timescaledb_internal that exposes the physical table in which the aggregate was stored.

`__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describes the mapping between the logical hypertable name and the physical source table from which they were captured.

If a new measurement is added and aggregate recomputation is triggered then an updated aggregate is emitted to the topic

postgres=# INSERT INTO conditions VALUES (now(), 'Ostrava', 10, 50);
postgres=# CALL refresh_continuous_aggregate('conditions_summary', CURRENT_DATE, CURRENT_DATE + 1);

that looks like

{
   "schema":{
...
   },
   "payload":{
      "before":null,
      "after":{
         "location":"Ostrava",
         "bucket":"2024-01-09T13:00:00.000000Z",
         "avg":10.0,
         "max":10.0,
         "min":10.0
      },
      "source":{
         "version":"2.5.0.Final",
         "connector":"postgresql",
         "name":"dbserver1",
         "ts_ms":1704806938840,
         "snapshot":"false",
         "db":"postgres",
         "sequence":"[\"29727872\",\"29728440\"]",
         "schema":"public",
         "table":"conditions_summary",
         "txId":764,
         "lsn":29728440,
         "xmin":null
      },
      "op":"c",
      "ts_ms":1704806939163,
      "transaction":null
   }
}

So the topic contains two or more messages calculated for two different locations.

Compression

The TimescaleDB SMT does not enhance compressed chunks of data (physical table records), only as a by-product of them being stored in a hypertable. The compressed data is captured and stored in the Kafka topic. Typically, messages with compressed chunks are dropped and are not processed by subsequent jobs in the pipeline.

Let’s enable compression for the hypertable and compress it

postgres=# ALTER TABLE conditions SET (timescaledb.compress, timescaledb.compress_segment by = 'location');
postgres=# SELECT show_chunks('conditions');
              show_chunks
----------------------------------------
 _timescaledb_internal._hyper_1_1_chunk
(1 row)

postgres=# SELECT compress_chunk( '_timescaledb_internal._hyper_1_1_chunk');

Messages are written to timescaledb._timescaledb_internal._compressed_hypertable_3.

Tear down the environment

docker-compose -f docker-compose-timescaledb.yaml down

Conclusion

In this post, we have demonstrated the capturing of data from TimescaleDB time-series database and their processing by the TimescaleDb SMT. We have shown how messages are routed and enriched depending on hypertables and continuous aggregates acting as the source of data.

Jiri Pechanec

Jiri is a software developer (and a former quality engineer) at Red Hat. He spent most of his career with Java and system integration projects and tasks. He lives near Brno, Czech Republic.

   


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.