In this article, we are going to present and demonstrate a new feature delivered in Debezium 2.4 - the integration with the TimescaleDB database.
TimescaleDB is an open-source database designed to make SQL scalable for time-series data. It is implemented as an extension for the PostgreSQL database. This fact leads us to re-use the standard Debezium PostgreSQL connector and implement TimescaleDB support as a single message transform (SMT).
TimescaleDB provides three basic building blocks/concepts:
-
Hypertables
-
Continuous aggregates
-
Compression
Metadata (catalog) that describes the definitions of the instances and the raw data are typically stored in _timescaledb_internal_schema
. TimescaleDb SMT connects to the database and reads and processes the metadata. The raw messages read from the database are then enriched with the metadata stored in Kafka Connect headers, creating the relation between the physical data and the TimescaleDB logical constructs.
Demonstration
Debezium examples repository contains a Docker Compose-based deployment that provides a full environment to demonstrate the TimescaleDB integration.
The first step is to start the deployment
$ docker-compose -f docker-compose-timescaledb.yaml up --build
The command will bring up Debezium (Zookeeper, Kafka, Kafka Connect) and the source TimescaleDB database.
The started database is primed with the following database objects:
-
Hypertable
conditions
representing temperature and humidity measurements as time-series data; created with DDLCREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL); SELECT create_hypertable('conditions', 'time')
-
A single record of the measurement data (
INSERT INTO conditions VALUES(NOW(), 'Prague', 22.8, 53.3)
) -
PostgreSQL publication used to publish time-series data into replication slot as the demo uses
pgoutput
decoding plugin (CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')
)
In the next step it is necessary to register the Debezium PostgreSQL connector to capture the changes in the database
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-timescaledb.yaml
The registration request file differs from the regular one with the addition of these lines
{
"name": "inventory-connector",
"config": {
...
"schema.include.list": "_timescaledb_internal",
"transforms": "timescaledb",
"transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb",
"transforms.timescaledb.database.hostname": "timescaledb",
"transforms.timescaledb.database.port": "5432",
"transforms.timescaledb.database.user": "postgres",
"transforms.timescaledb.database.password": "postgres",
"transforms.timescaledb.database.dbname": "postgres"
}
}
Hypertables
The connector will capture the internal TimescaleDB schema with the physical tables containing the raw data and the TimescaleDb
SMT will be applied to enrich messages and route them to the correctly named topics based on the logical names. The SMT configuration options contain information needed to connect to the database. In this case, the conditions
hypertable will be physically stored in _timescaledb_internal._hyper_1_1_chunk
and when processed by the SMT, it will be re-routed to timescaledb.public.conditions
topic that is named according to fixed configured prefix timescaledb
and logical name public.conditions
that conforms to the hypertable name.
Let’s add a few more measurements to the table
$ docker-compose -f docker-compose-timescaledb.yaml exec timescaledb env PGOPTIONS="--search_path=public" bash -c 'psql -U $POSTGRES_USER postgres'
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 30, 50);
postgres=# INSERT INTO conditions VALUES (now(), 'Brno', 35, 55);
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 40, 60);
and read the captured messages for the topic (printing of key and headers is enabled in the command)
docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--property print.headers=true \
--topic timescaledb.public.conditions
The messages contain two headers debezium_timescaledb_chunk_table:_hyper_1_1_chunk,debezium_timescaledb_chunk_schema:_timescaledb_internal
that describe the mapping between the logical hypertable name and the physical source table from which they were captured.
Continuous aggregates
Continuous aggregates provide automatic statistical calculations over data that is stored in hypertables. The aggregate is defined as a materialized view that is backed by its own hypertable which in turn is backed by a set of physical tables. After an aggregate is recalculated (either manually or automatically), the new values are stored in the hypertable, from which they can be captured and streamed. The connector captures the new values in the physical tables and the SMT again solves the routing by remapping the physical destination back into the aggregate logical name. Kafka Connect headers with original hypertable and physical table names are added too.
Let’s create a continuous aggregate named conditions_summary
that calculates the average, minimum, and maximum temperature per location and time interval
postgres=# CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
SELECT
location,
time_bucket(INTERVAL '1 hour', time) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM conditions
GROUP BY location, bucket;
and read the captured messages for the topic
docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--property print.headers=true \
--topic timescaledb.public.conditions_summary
The messages contain two headers debezium_timescaledb_hypertable_table:_materialized_hypertable_2,debezium_timescaledb_hypertable_schema:_timescaledb_internal
that expose which backing hypertable was used to store the aggregates and two addtional headers debezium_timescaledb_chunk_table:_hyper_2_2_chunk,debezium_timescaledb_chunk_schema:_timescaledb_internal
that exposes the physical table in which the aggregate was stored.
`__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describes the mapping between the logical hypertable name and the physical source table from which they were captured.
If a new measurement is added and aggregate recomputation is triggered then an updated aggregate is emitted to the topic
postgres=# INSERT INTO conditions VALUES (now(), 'Ostrava', 10, 50);
postgres=# CALL refresh_continuous_aggregate('conditions_summary', CURRENT_DATE, CURRENT_DATE + 1);
that looks like
{
"schema":{
...
},
"payload":{
"before":null,
"after":{
"location":"Ostrava",
"bucket":"2024-01-09T13:00:00.000000Z",
"avg":10.0,
"max":10.0,
"min":10.0
},
"source":{
"version":"2.5.0.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1704806938840,
"snapshot":"false",
"db":"postgres",
"sequence":"[\"29727872\",\"29728440\"]",
"schema":"public",
"table":"conditions_summary",
"txId":764,
"lsn":29728440,
"xmin":null
},
"op":"c",
"ts_ms":1704806939163,
"transaction":null
}
}
So the topic contains two or more messages calculated for two different locations.
Compression
The TimescaleDB SMT does not enhance compressed chunks of data (physical table records), only as a by-product of them being stored in a hypertable. The compressed data is captured and stored in the Kafka topic. Typically, messages with compressed chunks are dropped and are not processed by subsequent jobs in the pipeline.
Let’s enable compression for the hypertable and compress it
postgres=# ALTER TABLE conditions SET (timescaledb.compress, timescaledb.compress_segment by = 'location');
postgres=# SELECT show_chunks('conditions');
show_chunks
----------------------------------------
_timescaledb_internal._hyper_1_1_chunk
(1 row)
postgres=# SELECT compress_chunk( '_timescaledb_internal._hyper_1_1_chunk');
Messages are written to timescaledb._timescaledb_internal._compressed_hypertable_3
.
Tear down the environment
docker-compose -f docker-compose-timescaledb.yaml down
Conclusion
In this post, we have demonstrated the capturing of data from TimescaleDB time-series database and their processing by the TimescaleDb SMT. We have shown how messages are routed and enriched depending on hypertables and continuous aggregates acting as the source of data.
About Debezium
Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.
Get involved
We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.