Streaming data to a downstream database

In this blog post we will create a simple streaming data pipeline to continuously capture the changes in a MySQL database and replicate them in near real-time into a PostgreSQL database. We’ll show how to do this without writing any code, but instead by using and configuring Kafka Connect, the Debezium MySQL source connector, the Confluent JDBC sink connector, and a few single message transforms (SMTs).

This approach of replicating data through Kafka is really useful on its own, but it becomes even more advantageous when we can combine our near real-time streams of data changes with other streams, connectors, and stream processing applications. A recent Confluent blog post series shows a similar streaming data pipeline but using different connectors and SMTs. What’s great about Kafka Connect is that you can mix and match connectors to move data between multiple systems.

We will also demonstrate a new functionality that was released with Debezium 0.6.0: a single message transform for CDC Event Flattening.


The general topology for this scenario is displayed on the following picture:

Scenario topology
Figure 1: A General topology


To simplify the setup a little bit, we will use only one Kafka Connect instance that will contain all connectors. I.e. this instance will serve as an event producer and an event consumer:


Scenario topology
Figure 2: A Simplified topology


We will use this compose for a fast deployment of the demo. The deployment consists of following Docker images:

The Debezium MySQL Connector was designed to specifically capture database changes and provide as much information as possible about those events beyond just the new state of each row. Meanwhile, the Confluent JDBC Sink Connector was designed to simply convert each message into a database insert/upsert based upon the structure of the message. So, the two connectors have different structures for the messages, but they also use different topic naming conventions and behavior of representing deleted records.

These mismatches in structure and behavior will be common when using connectors that were not designed to work together. But this is something that we can easily deal with, and we discuss how in the next few sections.

Event format

Debezium emits events in a complex format that contains all of the information about the captured data change: the type of operation, source metadata, the timestamp the event was processed by the connector, and state of the row before and after the change was made. Debezium calls this structure an "envelope":

	"op": "u",
	"source": {
	"ts_ms" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"

Many other Kafka Connect source connectors don’t have the luxury of knowing this much about the changes, and instead use a simpler model where each message directly represents the after state of the row. This is also what many sink connectors expect, and the Confluent JDBC Sink Connector is not different:

	"field1" : "newvalue1",
	"field2" : "newvalue2"

While we think it’s actually a great thing that Debezium CDC connectors provide as much detail as possible, we also make it easy for you to transform Debezium’s "envelope" format into the "row" format that is expected by many other connectors. Debezium provides a bridge between those two formats in a form of a single message transform. The UnwrapFromEnvelope transformation automatically extracts a new row record and thus effectively flattens the complex record into a simple one consumable by other connectors.

You can use this SMT on the source connector to transform the message before it is written to Kafka, or you can instead store the source connector’s richer "envelope" form of the message in Kafka and use this SMT on the sink connector to transform the message after it is read from Kafka and before it is passed to the sink connector. Both options work, and it just depends on whether you find the envelope form of the message useful for other purposes.

In our example we apply the SMT at the sink connector using these configuration properties:

"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",

Delete records

When the Debezium connector detects a row is deleted, it creates two event messages: a delete event and a tombstone message. The delete message has an envelope with the state of the deleted row in the before field, and an after field that is null. The tombstone message contains same key as the delete message, but the entire message value is null, and Kafka’s log compaction utilizes this to know that it can remove any earlier messages with the same key. A number of sink connectors, including the Confluent’s JDBC Sink Connector, are not expecting these messages and will instead fail if they see either kind of message. The UnwrapFromEnvelope SMT will by default filter out both delete and tombstone records, though you can change this if you’re using the SMT and want to keep one or both of these kinds of messages.

Topic naming

Last but not least there is a difference in naming of topics. Debezium uses fully qualified naming for target topics representing each table it manages. The naming follows the pattern <logical-name>.<database-name>.<table-name>. Kafka Connect JDBC Connector works with simple names <table-name>.

In more complex scenarios the user may deploy the Kafka Streams framework to establish elaborated routing between source and target routes. In our example we will use a stock RegexRouter SMT that would route records created by Debezium into topics named according to JDBC Connector schema. Again, we could use this SMT in either the source or sink connectors, but for this example we’re going to use it in the source connector so we can choose the names of the Kafka topics where the records will be written.

"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"


Kick the tires and let’s try our example!

First of all we need to deploy all components.

docker-compose up

When all components are started we are going to register the JDBC Sink connector writing into PostgreSQL database:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json

Using this registration request:

    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "customers",
        "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
        "transforms": "unwrap",                                                  (1)
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",   (1)
        "auto.create": "true",                                                   (2)
        "insert.mode": "upsert",                                                 (3)
        "pk.fields": "id",                                                       (4)
        "pk.mode": "record_value"                                                (4)

The request configures these options:

  1. unwrapping Debezium’s complex format into a simple one

  2. automatically create target tables

  3. insert a row if it does not exist or update an existing one

  4. identify the primary key stored in Kafka’s record value field

Then the source connector must be set up:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

Using this registration request:

    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "": "184054",
        "": "dbserver1",                                         (1)
        "database.whitelist": "inventory",                                           (2)
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "transforms": "route",                                                       (3)
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",  (3)
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",                     (3)
        "transforms.route.replacement": "$3"                                         (3)

The request configures these options:

  1. logical name of the database

  2. the database we want to monitor

  3. an SMT which defines a regular expression matching the topic name <logical-name>.<database-name>.<table-name> and extracts the third part of it as the final topic name

Let’s check if the databases are synchronized. All the rows of the customers table should be found in the source database (MySQL) as well as the target database (Postgres):

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
| id   | first_name | last_name | email                 |
| 1001 | Sally      | Thomas    | |
| 1002 | George     | Bailey    |    |
| 1003 | Edward     | Walker    |         |
| 1004 | Anne       | Kretchmar |    |

docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
 Thomas    | 1001 | Sally      |
 Bailey    | 1002 | George     |
 Walker    | 1003 | Edward     |
 Kretchmar | 1004 | Anne       |

With the connectors still running, we can add a new row to the MySQL database and then check that it was replicated into the PostgreSQL database:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'
mysql> insert into customers values(default, 'John', 'Doe', '');
Query OK, 1 row affected (0.02 sec)

docker-compose exec -postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
Doe        | 1005 | John       |
(5 rows)


We set up a simple streaming data pipeline to replicate data in near real-time from a MySQL database to a PostgreSQL database. We accomplished this using Kafka Connect, the Debezium MySQL source connector, the Confluent JDBC sink connector, and a few SMTs — all without having to write any code. And since it is a streaming system, it will continue to capture all changes made to the MySQL database and replicating them in near real time.

What’s next?

In a future blog post we will reproduce the same scenario with Elasticsearch as a target for events.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.

back to top