When it comes to replicating operational data for analytics, Change Data Capture (CDC) is the gold standard. It offers scalability, near real-time performance, and captures all data modifications, ensuring your analytical datasets are always up-to-date. Debezium is a leading tool in this space, connecting to a wide range of databases and exporting CDC events in various formats like JSON and Avro, making integration with diverse systems a breeze.
While Debezium itself is a Java-based project, the data engineering world increasingly relies on Python. This blog post demonstrates how to leverage Debezium within a Python environment, using pydbzengine. We’ll explore how to use these technologies to build a robust and scalable CDC solution.
Python-Powered CDC Pipeline
Capturing Change Data with Debezium and Loading into DuckDB using DLT
This blog post demonstrates how to use Debezium to capture change data from a PostgreSQL database and load it into a DuckDB database using the Data Load Tool (DLT). All of this is achieved through a Python-based data processing pipeline using pydbzengine.
We’ll walk through the code, explaining the key components and how they work together.
Understanding the Components
-
Debezium: A powerful open-source platform for change data capture. It monitors database transaction logs and produces change event streams, indicating inserts, updates, and deletes.
-
pydbzengine: A Python library that provides a convenient way to interact with the Debezium embedded engine. It simplifies the process of configuring and running Debezium within a Python application.
-
DLT: A versatile data loading tool that simplifies the process of extracting, and loading data into various destinations. In this example, we use DLT to load the change events from Debezium into DuckDB.
-
DuckDB: An embeddable analytical database that provides efficient data processing capabilities. It’s an excellent choice for local development and testing.
-
Testcontainers: A library for spinning up throwaway instances of services, such as databases, for testing. We use it to manage the PostgreSQL database during the example.
Code Breakdown
The provided code demonstrates a complete CDC pipeline in python, from capturing changes with Debezium to loading them into DuckDB with DLT. Let’s break down the key parts:
1. Setting up the Environment
Let’s begin with setting up the necessary environment, including defining file paths, cleaning up previous runs, and defining a helper class DbPostgresql
that uses Testcontainers to manage the PostgreSQL database, which acts as our operational data source.
import os
from pathlib import Path
import dlt
import duckdb
from testcontainers.core.config import testcontainers_config
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.postgres import PostgresContainer
from pydbzengine import DebeziumJsonEngine, Properties
from pydbzengine.debeziumdlt import DltChangeHandler
from pydbzengine.helper import Utils
# set global variables
CURRENT_DIR = Path(__file__).parent
DUCKDB_FILE = CURRENT_DIR.joinpath("dbz_cdc_events_example.duckdb")
OFFSET_FILE = CURRENT_DIR.joinpath('postgresql-offsets.dat')
# cleanup
if OFFSET_FILE.exists():
os.remove(OFFSET_FILE)
if DUCKDB_FILE.exists():
os.remove(DUCKDB_FILE)
def wait_for_postgresql_to_start(self) -> None:
wait_for_logs(self, ".*database system is ready to accept connections.*")
wait_for_logs(self, ".*PostgreSQL init process complete.*")
class DbPostgresql:
POSTGRES_USER = "postgres"
POSTGRES_PASSWORD = "postgres"
POSTGRES_DBNAME = "postgres"
POSTGRES_IMAGE = "debezium/example-postgres:3.0.0.Final"
POSTGRES_HOST = "localhost"
POSTGRES_PORT_DEFAULT = 5432
CONTAINER: PostgresContainer = (PostgresContainer(image=POSTGRES_IMAGE,
port=POSTGRES_PORT_DEFAULT,
username=POSTGRES_USER,
password=POSTGRES_PASSWORD,
dbname=POSTGRES_DBNAME,
)
.with_exposed_ports(POSTGRES_PORT_DEFAULT)
)
PostgresContainer._connect = wait_for_postgresql_to_start
def start(self):
testcontainers_config.ryuk_disabled = True
print("Starting Postgresql Db...")
self.CONTAINER.start()
def stop(self):
print("Stopping Postgresql Db...")
self.CONTAINER.stop()
def __exit__(self, exc_type, exc_value, traceback):
self.stop()
2. Configuring Debezium
We configure Debezium by creating a Java Properties
object. This object holds the configuration settings for the Debezium engine. It includes database connection details, the connector class (PostgresConnector), offset storage, and transformations. The transforms
property is used to unwrap the Debezium message, simplifying downstream processing.
def debezium_engine_props(sourcedb: DbPostgresql):
props = Properties()
props.setProperty("name", "engine")
props.setProperty("snapshot.mode", "initial_only")
props.setProperty("database.hostname", sourcedb.CONTAINER.get_container_host_ip())
props.setProperty("database.port",
sourcedb.CONTAINER.get_exposed_port(sourcedb.POSTGRES_PORT_DEFAULT))
props.setProperty("database.user", sourcedb.POSTGRES_USER)
props.setProperty("database.password", sourcedb.POSTGRES_PASSWORD)
props.setProperty("database.dbname", sourcedb.POSTGRES_DBNAME)
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
props.setProperty("offset.storage.file.filename", OFFSET_FILE.as_posix())
props.setProperty("max.batch.size", "5")
props.setProperty("poll.interval.ms", "10000")
props.setProperty("converter.schemas.enable", "false")
props.setProperty("offset.flush.interval.ms", "1000")
props.setProperty("database.server.name", "testc")
props.setProperty("database.server.id", "1234")
props.setProperty("topic.prefix", "testc")
props.setProperty("schema.whitelist", "inventory")
props.setProperty("database.whitelist", "inventory")
props.setProperty("table.whitelist", "inventory.*")
props.setProperty("replica.identity.autoset.values", "inventory.*:FULL")
# // debezium unwrap message
props.setProperty("transforms", "unwrap")
props.setProperty("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState")
props.setProperty("transforms.unwrap.add.fields", "op,table,source.ts_ms,sourcedb,ts_ms")
props.setProperty("transforms.unwrap.delete.handling.mode", "rewrite")
# props.setProperty("debezium.transforms.unwrap.drop.tombstones", "true")
return props
3. Change Handler Implementation
The DltChangeHandler
class, provided by the pydbzengine library, acts as the bridge between Debezium and DLT. It receives change data capture events from Debezium and then leverages the DLT pipeline to efficiently load this data into your chosen destination, such as DuckDB. Essentially, it’s the component that connects the real-time change data stream from Debezium to the data loading capabilities of DLT. You can find the full implementation in the pydbzengine repository.
Similarly, a custom consuming logic could be implemented, just by implementing the handleJsonBatch
method. This allows you to implement custom processing logic and consuming data to various destinations or services.
from pydbzengine import BasePythonChangeHandler, ChangeEvent
class MyXYZChangeHandler(BasePythonChangeHandler):
def handleJsonBatch(self, records: List[ChangeEvent]):
# Process your data here!
for record in records:
# ... your processing logic ...
# Example: send data to another service, database, etc.
4. Running the Debezium Engine and DLT Pipeline
The main
function orchestrates the entire process. It starts the PostgreSQL container, creates the Debezium engine with the configured properties and the DltChangeHandler
, and starts the engine.
def main():
# Start the PostgreSQL container that will serve as the replication source.
sourcedb = DbPostgresql()
sourcedb.start()
# Get Debezium engine configuration properties
props = debezium_engine_props(sourcedb=sourcedb)
# Create a dlt pipeline to consume the change events into DuckDB.
dlt_pipeline = dlt.pipeline(
pipeline_name="dbz_cdc_events_example",
destination="duckdb",
dataset_name="dbz_data"
)
handler = DltChangeHandler(dlt_pipeline=dlt_pipeline)
engine = DebeziumJsonEngine(properties=props, handler=handler)
# Run the Debezium engine asynchronously with a timeout. This allows the example
# to run for a limited time and then terminate automatically.
Utils.run_engine_async(engine=engine, timeout_sec=60)
# engine.run() # This would be used for synchronous execution (without timeout)
if __name__ == "__main__":
main()
5. Querying the DuckDB Database, The result
After the Debezium engine has run for a specified time (60 seconds in the example), we can connect to destination(DuckDB) database and display the loaded data.
con = duckdb.connect(DUCKDB_FILE.as_posix())
result = con.sql("SHOW ALL TABLES").fetchall()
for r in result:
database, schema, table = r[:3] # Extract database, schema, and table names.
if schema == "dbz_data": # Only show data from the schema where Debezium loaded the data.
print(f"Data in table {table}:")
con.sql(f"select * from {database}.{schema}.{table} limit 5").show() # Display table data
Consumed data:
┌────────────────────┬────────────────────────┬────────┬───────────────────────────────┬──────────────────────────────────────────────┐
│ load_id │ schema_name │ status │ inserted_at │ schema_version_hash │
│ varchar │ varchar │ int64 │ timestamp with time zone │ varchar │
├────────────────────┼────────────────────────┼────────┼───────────────────────────────┼──────────────────────────────────────────────┤
│ 1738405897.413279 │ debezium_source_events │ 0 │ 2025-02-01 11:31:38.086127+01 │ Q5UNIOd7gJ6ljH5qfKKcO7yWwPvNESKW+mVXJmx9geg= │
│ 1738405898.176148 │ debezium_source_events │ 0 │ 2025-02-01 11:31:39.381275+01 │ OyUXGP6PvFQuUTPnPdvESnsEqpFAxivJoP+l0G6l4+M= │
│ 1738405899.4865642 │ debezium_source_events │ 0 │ 2025-02-01 11:31:39.704015+01 │ jqZNcnJXF/33Va2kRWgKOZF4RnZSVgYxMDhFep8+Jg8= │
│ 1738405899.775917 │ debezium_source_events │ 0 │ 2025-02-01 11:31:39.952311+01 │ jqZNcnJXF/33Va2kRWgKOZF4RnZSVgYxMDhFep8+Jg8= │
│ 1738405900.0213661 │ debezium_source_events │ 0 │ 2025-02-01 11:31:40.223125+01 │ uMZY5n2NGPecXvVQIePLEg2nZQcAlkoWAXDLALKjWuQ= │
└────────────────────┴────────────────────────┴────────┴───────────────────────────────┴──────────────────────────────────────────────┘
Data in table _dlt_pipeline_state:
┌─────────┬────────────────┬──────────────────────┬──────────────────────┬──────────────────────┬────────────────────────────────────────────┬───────────────────┬────────────────┐
│ version │ engine_version │ pipeline_name │ state │ created_at │ version_hash │ _dlt_load_id │ _dlt_id │
│ int64 │ int64 │ varchar │ varchar │ timestamp with tim… │ varchar │ varchar │ varchar │
├─────────┼────────────────┼──────────────────────┼──────────────────────┼──────────────────────┼────────────────────────────────────────────┼───────────────────┼────────────────┤
│ 1 │ 4 │ dbz_cdc_events_exa… │ eNp1j0FLw0AQhf/LXg… │ 2025-02-01 11:31:3… │ ZvlGi9hyfXjD2b0imkL9ZA7x3S1/YkmQK4QbA+Jw… │ 1738405897.413279 │ hNbs3TIc3vRHvA │
└─────────┴────────────────┴──────────────────────┴──────────────────────┴──────────────────────┴────────────────────────────────────────────┴───────────────────┴────────────────┘
Data in table _dlt_version:
┌─────────┬────────────────┬──────────────────────┬──────────────────────┬──────────────────────┬─────────────────────────────────────────────────────────────────────────────────┐
│ version │ engine_version │ inserted_at │ schema_name │ version_hash │ schema │
│ int64 │ int64 │ timestamp with tim… │ varchar │ varchar │ varchar │
├─────────┼────────────────┼──────────────────────┼──────────────────────┼──────────────────────┼─────────────────────────────────────────────────────────────────────────────────┤
│ 2 │ 11 │ 2025-02-01 11:31:3… │ debezium_source_ev… │ Q5UNIOd7gJ6ljH5qfK… │ {"version":2,"version_hash":"Q5UNIOd7gJ6ljH5qfKKcO7yWwPvNESKW+mVXJmx9geg=","e… │
│ 4 │ 11 │ 2025-02-01 11:31:3… │ debezium_source_ev… │ OyUXGP6PvFQuUTPnPd… │ {"version":4,"version_hash":"OyUXGP6PvFQuUTPnPdvESnsEqpFAxivJoP+l0G6l4+M=","e… │
│ 6 │ 11 │ 2025-02-01 11:31:3… │ debezium_source_ev… │ jqZNcnJXF/33Va2kRW… │ {"version":6,"version_hash":"jqZNcnJXF/33Va2kRWgKOZF4RnZSVgYxMDhFep8+Jg8=","e… │
│ 8 │ 11 │ 2025-02-01 11:31:4… │ debezium_source_ev… │ uMZY5n2NGPecXvVQIe… │ {"version":8,"version_hash":"uMZY5n2NGPecXvVQIePLEg2nZQcAlkoWAXDLALKjWuQ=","e… │
└─────────┴────────────────┴──────────────────────┴──────────────────────┴──────────────────────┴─────────────────────────────────────────────────────────────────────────────────┘
Data in table testc_inventory_customers:
┌───────┬────────────┬───────────┬───────────────────────┬─────────┬─────────┬───────────┬───────────────┬───────────────┬───────────────────┬────────────────┐
│ id │ first_name │ last_name │ email │ deleted │ op │ table │ source_ts_ms │ ts_ms │ _dlt_load_id │ _dlt_id │
│ int64 │ varchar │ varchar │ varchar │ varchar │ varchar │ varchar │ int64 │ int64 │ varchar │ varchar │
├───────┼────────────┼───────────┼───────────────────────┼─────────┼─────────┼───────────┼───────────────┼───────────────┼───────────────────┼────────────────┤
│ 1001 │ Sally │ Thomas │ sally.thomas@acme.com │ false │ r │ customers │ 1738405883186 │ 1738405896858 │ 1738405897.413279 │ KcWKrYODYJ859w │
│ 1002 │ George │ Bailey │ gbailey@foobar.com │ false │ r │ customers │ 1738405883186 │ 1738405896862 │ 1738405897.413279 │ JU6dR1S27Xt3QA │
│ 1003 │ Edward │ Walker │ ed@walker.com │ false │ r │ customers │ 1738405883186 │ 1738405896862 │ 1738405897.413279 │ 02kMVvIX2/aGGg │
│ 1004 │ Anne │ Kretchmar │ annek@noanswer.org │ false │ r │ customers │ 1738405883186 │ 1738405896862 │ 1738405897.413279 │ TI7jpxl9FD2kRQ │
└───────┴────────────┴───────────┴───────────────────────┴─────────┴─────────┴───────────┴───────────────┴───────────────┴───────────────────┴────────────────┘
Data in table testc_inventory_geom:
┌───────┬──────────────────────────────────────────────────────────────────────┬─────────┬─────────┬─────────┬───────────────┬───────────────┬───────────────────┬────────────────┐
│ id │ g__wkb │ deleted │ op │ table │ source_ts_ms │ ts_ms │ _dlt_load_id │ _dlt_id │
│ int64 │ varchar │ varchar │ varchar │ varchar │ int64 │ int64 │ varchar │ varchar │
├───────┼──────────────────────────────────────────────────────────────────────┼─────────┼─────────┼─────────┼───────────────┼───────────────┼───────────────────┼────────────────┤
│ 1 │ AQEAAAAAAAAAAADwPwAAAAAAAPA/ │ false │ r │ geom │ 1738405883186 │ 1738405896872 │ 1738405897.413279 │ 17snqevSVWL0xA │
│ 2 │ AQIAAAACAAAAAAAAAAAAAEAAAAAAAADwPwAAAAAAABhAAAAAAAAAGEA= │ false │ r │ geom │ 1738405883186 │ 1738405896872 │ 1738405898.176148 │ W4kfG5n5jYhy3w │
│ 3 │ AQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAFEAAAAAAAAAAQAAAAAAAABRAAAAAAA… │ false │ r │ geom │ 1738405883186 │ 1738405896872 │ 1738405898.176148 │ 40HrbnruXZaB/g │
└───────┴──────────────────────────────────────────────────────────────────────┴─────────┴─────────┴─────────┴───────────────┴───────────────┴───────────────────┴────────────────┘
Data in table testc_inventory_orders:
┌───────┬────────────┬───────────┬──────────┬────────────┬─────────┬─────────┬─────────┬───────────────┬───────────────┬────────────────────┬────────────────┐
│ id │ order_date │ purchaser │ quantity │ product_id │ deleted │ op │ table │ source_ts_ms │ ts_ms │ _dlt_load_id │ _dlt_id │
│ int64 │ int64 │ int64 │ int64 │ int64 │ varchar │ varchar │ varchar │ int64 │ int64 │ varchar │ varchar │
├───────┼────────────┼───────────┼──────────┼────────────┼─────────┼─────────┼─────────┼───────────────┼───────────────┼────────────────────┼────────────────┤
│ 10001 │ 16816 │ 1001 │ 1 │ 102 │ false │ r │ orders │ 1738405883186 │ 1738405896876 │ 1738405898.176148 │ X7ejebZDxmm+hw │
│ 10002 │ 16817 │ 1002 │ 2 │ 105 │ false │ r │ orders │ 1738405883186 │ 1738405896876 │ 1738405898.176148 │ 6LU0Fe9UVE3XFQ │
│ 10003 │ 16850 │ 1002 │ 2 │ 106 │ false │ r │ orders │ 1738405883186 │ 1738405896876 │ 1738405898.176148 │ 0OIBPdMzqjLh0w │
│ 10004 │ 16852 │ 1003 │ 1 │ 107 │ false │ r │ orders │ 1738405883186 │ 1738405896876 │ 1738405899.4865642 │ CcY6FKlHLQ6mPg │
└───────┴────────────┴───────────┴──────────┴────────────┴─────────┴─────────┴─────────┴───────────────┴───────────────┴────────────────────┴────────────────┘
Data in table testc_inventory_products:
┌───────┬────────────────────┬──────────────────────────────────────┬────────┬─────────┬─────────┬──────────┬───────────────┬───────────────┬────────────────────┬────────────────┐
│ id │ name │ description │ weight │ deleted │ op │ table │ source_ts_ms │ ts_ms │ _dlt_load_id │ _dlt_id │
│ int64 │ varchar │ varchar │ double │ varchar │ varchar │ varchar │ int64 │ int64 │ varchar │ varchar │
├───────┼────────────────────┼──────────────────────────────────────┼────────┼─────────┼─────────┼──────────┼───────────────┼───────────────┼────────────────────┼────────────────┤
│ 101 │ scooter │ Small 2-wheel scooter │ 3.14 │ false │ r │ products │ 1738405883186 │ 1738405896879 │ 1738405899.4865642 │ aOf6efrtt48+1Q │
│ 102 │ car battery │ 12V car battery │ 8.1 │ false │ r │ products │ 1738405883186 │ 1738405896880 │ 1738405899.4865642 │ kUuPhtKUAsTUaA │
│ 103 │ 12-pack drill bits │ 12-pack of drill bits with sizes r… │ 0.8 │ false │ r │ products │ 1738405883186 │ 1738405896880 │ 1738405899.4865642 │ evSpPy68nldtbg │
│ 104 │ hammer │ 12oz carpenter's hammer │ 0.75 │ false │ r │ products │ 1738405883186 │ 1738405896880 │ 1738405899.4865642 │ lCpa9yyHSm8xqA │
│ 105 │ hammer │ 14oz carpenter's hammer │ 0.875 │ false │ r │ products │ 1738405883186 │ 1738405896880 │ 1738405899.775917 │ VXcDU/tw/zT2fw │
└───────┴────────────────────┴──────────────────────────────────────┴────────┴─────────┴─────────┴──────────┴───────────────┴───────────────┴────────────────────┴────────────────┘
Data in table testc_inventory_products_on_hand:
┌────────────┬──────────┬─────────┬─────────┬──────────────────┬───────────────┬───────────────┬────────────────────┬────────────────┐
│ product_id │ quantity │ deleted │ op │ table │ source_ts_ms │ ts_ms │ _dlt_load_id │ _dlt_id │
│ int64 │ int64 │ varchar │ varchar │ varchar │ int64 │ int64 │ varchar │ varchar │
├────────────┼──────────┼─────────┼─────────┼──────────────────┼───────────────┼───────────────┼────────────────────┼────────────────┤
│ 101 │ 3 │ false │ r │ products_on_hand │ 1738405883186 │ 1738405896883 │ 1738405900.0213661 │ 90r3+XR7PH7y6g │
│ 102 │ 8 │ false │ r │ products_on_hand │ 1738405883186 │ 1738405896883 │ 1738405900.0213661 │ 5F+LUMVYO3I2wQ │
│ 103 │ 18 │ false │ r │ products_on_hand │ 1738405883186 │ 1738405896883 │ 1738405900.0213661 │ SguX65iX7ffyJg │
│ 104 │ 4 │ false │ r │ products_on_hand │ 1738405883186 │ 1738405896883 │ 1738405900.0213661 │ Vj/N2j0bN3ipzw │
│ 105 │ 5 │ false │ r │ products_on_hand │ 1738405883186 │ 1738405896883 │ 1738405900.0213661 │ z31M4RIQPpq3BA │
└────────────┴──────────┴─────────┴─────────┴──────────────────┴───────────────┴───────────────┴────────────────────┴────────────────┘
Testing it yourself
You can see complete example and start experimenting using dlt_consuming.
To run this example, you’ll need to have Docker Desktop installed and the required Python libraries. You can install the dependencies using:
pip install pydbzengine[dev]
python dlt_consuming.py
Key Takeaways
This example demonstrates a robust and simple way to capture change data from a database and load it into a data warehouse using Debezium and DLT. The combination of these tools provides a powerful and simple solution for CDC scenarios, enabling real-time data synchronization and analysis.
The use of Python and pydbzengine makes it easy to integrate Debezium into existing Python workflows.
The DltChangeHandler
provides a clean separation of concerns, handling the integration with DLT and the data loading process.
Wrap-Up and Contributions
Based on Debezium, pydbzengine makes it very simple to set up a low-latency data ingestion pipeline using python. The project completely open-source, using the Apache 2.0 license. pydbzengine still is a young project and there are things to improve. Please feel free to test it, give feedback, open feature requests or send pull requests.
Happy data engineering…
About Debezium
Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.
Get involved
We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.