Subscribe


Querying Debezium Change Data Events With KSQL

Last year we have seen the inception of a new open-source project in the Apache Kafka universe, KSQL, which is a streaming SQL engine build on top of Kafka Streams. In this post, we are going to try out KSQL querying with data change events generated by Debezium from a MySQL database.

As a source of data we will use the database and setup from our tutorial. The result of this exercise should be similar to the recent post about aggregation of events into domain driven aggregates.

Entity diagram

First let’s look at the entities in the database and the relations between them.

Entity diagram
Figure 1: Entity diagram of the example entities

 

The picture above shows the full ER diagram for the inventory database in the example MySQL instance. We are going to focus on two entities:

  • customers - the list of customers in the system

  • orders - the list of orders in the system

There is a 1:n relation between customers and orders, modelled by the purchaser column in the orders table, which is a foreign key to the customers table.

Configuration

We are going to use a Docker Compose file for the deployment of the environment. The deployment consists of the following Docker images:

We also need the KSQL client. To make things simple we are going to use a pre-built Docker image but you can download and directly use the client from the KSQL download page.

Example

First we need to start the Debezium and Kafka infrastructure. To do so, clone the debezium-examples GitHub repository and start the required components using the provided Compose file:

export DEBEZIUM_VERSION=0.7
git clone https://github.com/debezium/debezium-examples.git
cd debezium-examples/tutorial/
docker-compose -f docker-compose-mysql.yaml up

Next we must register an instance of the Debezium MySQL connector to listen to changes in the database:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @- <<-EOF
{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184055",
        "database.server.name": "dbserver",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
    }
}
EOF

Now we should have all components up and running and initial data change events are already streamed into Kafka topics. There are multiple properties that are especially important for our use case:

  • The UnwrapFromEnvelope SMT is used. This allows us to directly map fields from the after part of change records into KSQL statements. Without it, we would need to use EXTRACTJSONFIELD for each field to be extracted from the after part of messages.

  • Schemas are disabled for the JSON converter. The reason is the same as above. With schemas enabled, for JSON the record is encapsulated in a JSON structure that contains the fields schema (with schema information) and payload (with the actual data itself). We would again need to use EXTRACTJSONFIELD to get to the relevant fields. There is no such issue with Avro converter so this option does not need to be set when Avro is used.

Next we are going to start the KSQL command shell. We will run a local engine in the CLI. Also please note --net parameter. This guarantees that KSQL container runs in the same network as Debezium containers and allows proper DNS resolution.

docker run -it --net tutorial_default confluentinc/ksql-cli ksql-cli local --bootstrap-server kafka:9092

First we will list all Kafka topics that exist in the broker:

ksql> LIST TOPICS;

 Kafka Topic                         | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
 connect-status                      | false      | 5          | 1
 dbserver                            | false      | 1          | 1
 dbserver.inventory.addresses        | false      | 1          | 1
 dbserver.inventory.customers        | false      | 1          | 1
 dbserver.inventory.orders           | false      | 1          | 1
 dbserver.inventory.products         | false      | 1          | 1
 dbserver.inventory.products_on_hand | false      | 1          | 1
 ksql__commands                      | true       | 1          | 1
 my_connect_configs                  | false      | 1          | 1
 my_connect_offsets                  | false      | 25         | 1
 schema-changes.inventory            | false      | 1          | 1

The topics we are interested in are dbserver.inventory.orders and dbserver.inventory.customers.

KSQL processing by default starts with latest offsets. We want to process the events already in the topics so we switch processing from earliest offsets.

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

First we need to create streams from the topics containing the Debezium data change events. A stream in KSQL and Kafka Streams terminology is an unbounded incoming data set with no state.

ksql> CREATE STREAM orders_from_debezium (order_number integer, order_date string, purchaser integer, quantity integer, product_id integer) WITH (KAFKA_TOPIC='dbserver.inventory.orders',VALUE_FORMAT='json');

 Message
----------------
 Stream created
ksql>
ksql> CREATE STREAM customers_from_debezium (id integer, first_name string, last_name string, email string) WITH (KAFKA_TOPIC='dbserver.inventory.customers',VALUE_FORMAT='json');

 Message
----------------
 Stream created

Partitioning

Our deployment uses only one partition per topic. In a production system there will likely be multiple partitions per topic and we need to ensure that all events belonging to our aggregated object end up in the same partition. The natural partioning in our case is per customer id. We are going to repartition the orders_from_debezium stream according to the purchaser field that contains the customer id. The repartitioned data are written into a new topic ORDERS_REPART:

ksql> CREATE STREAM orders WITH (KAFKA_TOPIC='ORDERS_REPART',VALUE_FORMAT='json',PARTITIONS=1) as SELECT * FROM orders_from_debezium PARTITION BY PURCHASER;

 Message
----------------------------
 Stream created and running
ksql> LIST TOPICS;

 Kafka Topic                         | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
...
 ORDERS_REPART                       | true       | 1          | 1
...

We are going to execute the same operation for customers too. It is necessary for two reasons:

  • The current key is a struct that contains a field named id with the customer id. This is different from the repartitioned order topic which contains only the id value as the key, so the partitions would not match.

  • When we will create a JOIN later, there is a limitation that requires the key to have the same value as a key field in the table. The table field contains a plain value but the key contains a struct so they would not match. See this KSQL issue for more details.

ksql> CREATE STREAM customers_stream WITH (KAFKA_TOPIC='CUSTOMERS_REPART',VALUE_FORMAT='json',PARTITIONS=1) as SELECT * FROM customers_from_debezium PARTITION BY ID;

 Message
----------------------------
 Stream created and running
ksql> LIST TOPICS;

 Kafka Topic                         | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
...
 CUSTOMERS_REPART                    | true       | 1          | 1
...

To verify that records have a new key and are thus repartioned we can issue few statements to compare the results:

ksql> SELECT * FROM orders_from_debezium LIMIT 1;
1524034842810 | {"order_number":10001} | 10001 | 16816 | 1001 | 1 | 102
LIMIT reached for the partition.
Query terminated
ksql> SELECT * FROM orders LIMIT 1;
1524034842810 | 1001 | 10001 | 16816 | 1001 | 1 | 102
LIMIT reached for the partition.
Query terminated

The second column contains ROWKEY which is the key of the message.

Customer/order join

So far we were only declaring streams as an unbounded stateless data set. In our use case the order is really an event that comes and goes. But customer is an entity that can be updated and generally is a part of a state fo the system. Such quality is represented in KSQL or Kafka Streams as table. We are going to create a table of customers from the topic containing repartitioned customers.

ksql> CREATE TABLE customers (id integer, first_name string, last_name string, email string) WITH (KAFKA_TOPIC='CUSTOMERS_REPART',VALUE_FORMAT='json',KEY='id');

 Message
---------------
 Table created

Now we have everything in place to make a join between customer and its orders and create a query that will monitor incoming orders and list them with associated customer fields.

ksql> SELECT order_number,quantity,customers.first_name,customers.last_name FROM orders left join customers on orders.purchaser=customers.id;
10001 | 1 | Sally | Thomas
10002 | 2 | George | Bailey
10003 | 2 | George | Bailey
10004 | 1 | Edward | Walker

Let’s apply a few changes to the database, which will result in corresponding CDC events being emitted by Debezium:

docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

mysql> INSERT INTO orders VALUES(default,NOW(), 1003,5,101);
Query OK, 1 row affected, 1 warning (0.02 sec)

mysql> UPDATE customers SET first_name='Annie' WHERE id=1004;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> UPDATE orders SET quantity=20 WHERE order_number=10004;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

You may notice that only changes in the orders table have triggered changes in the joined stream. This is a product of the stream/table join. We would need a stream/stream join to trigger changes if any of input streams is modified.

So the final result of the select after the database is modified is

10001 | 1 | Sally | Thomas
10002 | 2 | George | Bailey
10003 | 2 | George | Bailey
10004 | 1 | Edward | Walker
10005 | 5 | Edward | Walker
10004 | 20 | Edward | Walker

Summary

We have successfully started a KSQL instance. We have mapped KSQL streams to Debezium topics filled by Debezium and made a join between them. We have also discussed the problem of repartioning in streaming applications.

If you’d like to try out this example with Avro encoding and schema registry then you can use our Avro example. Also for further details and more advanced usages just refer to the KSQL syntax reference.

In case you need help, have feature requests or would like to share your experiences with this example, please let us know in the comments below.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Streaming Data Changes from Your Database to Elasticsearch

We wish all the best to the Debezium community for 2018!

While we’re working on the 0.7.2 release, we thought we’d publish another post describing an end-to-end data streaming use case based on Debezium. We have seen how to set up a change data stream to a downstream database a few weeks ago. In this blog post we will follow the same approach to stream the data to an Elasticsearch server to leverage its excellent capabilities for full-text search on our data. But to make the matter a little bit more interesting, we will stream the data to both, a PostgreSQL database and Elasticsearch, so we will optimize access to the data via the SQL query language as well as via full-text search.

Topology

Here’s a diagram that shows how the data is flowing through our distributed system. First, the Debezium MySQL connector is continuously capturing the changes from the MySQL database, and sending the changes for each table to separate Kafka topics. Then, the Confluent JDBC sink connector is continuously reading those topics and writing the events into the PostgreSQL database. And, at the same time, the Confluent Elasticsearch connector is continuously reading those same topics and writing the events into Elasticsearch.

 

Scenario topology
Figure 1: A general topology

 

We are going to deploy these components into several different processes. In this example, we’ll deploy all three connectors to a single Kafka Connect instance that will write to and read from Kafka on behalf of all of the connectors (in production you might need to keep the connectors separated to achieve better performance).

 

Scenario topology
Figure 2: A simplified topology

Configuration

We will use this Docker Compose file for a fast deployment of the demo. The deployment consists of the following Docker images:

  • Apache ZooKeeper

  • Apache Kafka

  • An enriched Kafka Connect / Debezium image with a few changes:

    • PostgreSQL JDBC driver placed into /kafka/libs directory

    • The Confluent JDBC connector placed into /kafka/connect/kafka-connect-jdbc directory

  • Pre-populated MySQL as used in our tutorial

  • Empty PostgreSQL

  • Empty Elasticsearch

The message format is not the same for the Debezium source connector and the JDBC and Elasticsearch connectors as they are developed separately and each focuses on slightly different objectives. Debezium emits a more complex event structure so that it captures all of the information available. In particular, the change events contain the old and the new state of a changed record. Both sink connectors on the other hand expect a simple message that just represents the record state to be written.

Debezium’s UnwrapFromEnvelope single message transformation (SMT) collapses the complex change event structure into the same row-based format expected by the two sink connectors and effectively acts as a message translator between the two aforementioned formats.

Example

Let’s move directly to our example as that’s where the changes are visible. First of all we need to deploy all components:

export DEBEZIUM_VERSION=0.7
docker-compose up

When all components are started we are going to register the Elasticsearch Sink connector writing into the Elasticsearch instance. We want to use the same key (primary id) in the source and both PostgreSQL and Elasticsearch:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @es-sink.json

We’re using this registration request:

{
  {
    "name": "elastic-sink",
    "config": {
      "connector.class":
          "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "tasks.max": "1",
      "topics": "customers",
      "connection.url": "http://elastic:9200",
      "transforms": "unwrap,key",
      "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",        (1)
      "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",(2)
      "transforms.key.field": "id",                                                 (2)
      "key.ignore": "false",                                                        (3)
      "type.name": "customer"                                                       (4)
    }
  }
}

The request configures these options:

  1. extracting only the new row’s state from Debezium’s change data message

  2. extracting the id field from the key struct, then the same key is used for the source and both destinations. This is to address the fact that the Elasticsearch connector only supports numeric types and string as keys. If we do not extract the id the messages will be filtered out by the connector because of unknown key type.

  3. use key from the event instead of generating a synthetic one

  4. type under which the events will be registered in Elasticsearch

Next we are going to register the JDBC Sink connector writing into PostgreSQL database:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @jdbc-sink.json

Finally, the source connector must be set up:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @source.json

Let’s check if the databases and the search server are synchronized. All the rows of the customers table should be found in the source database (MySQL) as well as the target database (Postgres) and Elasticsearch:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | sally.thomas@acme.com
 Bailey    | 1002 | George     | gbailey@foobar.com
 Walker    | 1003 | Edward     | ed@walker.com
 Kretchmar | 1004 | Anne       | annek@noanswer.org
curl 'http://localhost:9200/customers/_search?pretty'
{
  "took" : 42,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 4,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1001",
        "_score" : 1.0,
        "_source" : {
          "id" : 1001,
          "first_name" : "Sally",
          "last_name" : "Thomas",
          "email" : "sally.thomas@acme.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1004",
        "_score" : 1.0,
        "_source" : {
          "id" : 1004,
          "first_name" : "Anne",
          "last_name" : "Kretchmar",
          "email" : "annek@noanswer.org"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1002",
        "_score" : 1.0,
        "_source" : {
          "id" : 1002,
          "first_name" : "George",
          "last_name" : "Bailey",
          "email" : "gbailey@foobar.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1003",
        "_score" : 1.0,
        "_source" : {
          "id" : 1003,
          "first_name" : "Edward",
          "last_name" : "Walker",
          "email" : "ed@walker.com"
        }
      }
    ]
  }
}

With the connectors still running, we can add a new row to the MySQL database and then check that it was replicated into both the PostgreSQL database and Elasticsearch:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'

mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)
docker-compose exec -postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
...
Doe        | 1005 | John       | john.doe@example.com
(5 rows)
curl 'http://localhost:9200/customers/_search?pretty'
...
{
  "_index" : "customers",
  "_type" : "customer",
  "_id" : "1005",
  "_score" : 1.0,
  "_source" : {
    "id" : 1005,
    "first_name" : "John",
    "last_name" : "Doe",
    "email" : "john.doe@example.com"
  }
}
...

Summary

We set up a complex streaming data pipeline to synchronize a MySQL database with another database and also with an Elasticsearch instance. We managed to keep the same identifier across all systems which allows us to correlate records across the system as a whole.

Propagating data changes from a primary database in near realtime to a search engine such as Elasticsearch enables many interesting use cases. Besides different applications of fulltext search one could for instance also think about creating dashboards and all kinds of visualizations using Kibana, to gain further insight into the data.

If you’d like to try out this set-up yourself, just clone the project from our examples repo. In case you need help, have feature requests or would like to share your experiences with this pipeline, please let us know in the comments below.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Streaming data to a downstream database

In this blog post we will create a simple streaming data pipeline to continuously capture the changes in a MySQL database and replicate them in near real-time into a PostgreSQL database. We’ll show how to do this without writing any code, but instead by using and configuring Kafka Connect, the Debezium MySQL source connector, the Confluent JDBC sink connector, and a few single message transforms (SMTs).

This approach of replicating data through Kafka is really useful on its own, but it becomes even more advantageous when we can combine our near real-time streams of data changes with other streams, connectors, and stream processing applications. A recent Confluent blog post series shows a similar streaming data pipeline but using different connectors and SMTs. What’s great about Kafka Connect is that you can mix and match connectors to move data between multiple systems.

We will also demonstrate a new functionality that was released with Debezium 0.6.0: a single message transform for CDC Event Flattening.

Topology

The general topology for this scenario is displayed on the following picture:

Scenario topology
Figure 1: A General topology

 

To simplify the setup a little bit, we will use only one Kafka Connect instance that will contain all connectors. I.e. this instance will serve as an event producer and an event consumer:

 

Scenario topology
Figure 2: A Simplified topology

Configuration

We will use this compose for a fast deployment of the demo. The deployment consists of following Docker images:

The Debezium MySQL Connector was designed to specifically capture database changes and provide as much information as possible about those events beyond just the new state of each row. Meanwhile, the Confluent JDBC Sink Connector was designed to simply convert each message into a database insert/upsert based upon the structure of the message. So, the two connectors have different structures for the messages, but they also use different topic naming conventions and behavior of representing deleted records.

These mismatches in structure and behavior will be common when using connectors that were not designed to work together. But this is something that we can easily deal with, and we discuss how in the next few sections.

Event format

Debezium emits events in a complex format that contains all of the information about the captured data change: the type of operation, source metadata, the timestamp the event was processed by the connector, and state of the row before and after the change was made. Debezium calls this structure an "envelope":

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

Many other Kafka Connect source connectors don’t have the luxury of knowing this much about the changes, and instead use a simpler model where each message directly represents the after state of the row. This is also what many sink connectors expect, and the Confluent JDBC Sink Connector is not different:

{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}

While we think it’s actually a great thing that Debezium CDC connectors provide as much detail as possible, we also make it easy for you to transform Debezium’s "envelope" format into the "row" format that is expected by many other connectors. Debezium provides a bridge between those two formats in a form of a single message transform. The UnwrapFromEnvelope transformation automatically extracts a new row record and thus effectively flattens the complex record into a simple one consumable by other connectors.

You can use this SMT on the source connector to transform the message before it is written to Kafka, or you can instead store the source connector’s richer "envelope" form of the message in Kafka and use this SMT on the sink connector to transform the message after it is read from Kafka and before it is passed to the sink connector. Both options work, and it just depends on whether you find the envelope form of the message useful for other purposes.

In our example we apply the SMT at the sink connector using these configuration properties:

"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",

Delete records

When the Debezium connector detects a row is deleted, it creates two event messages: a delete event and a tombstone message. The delete message has an envelope with the state of the deleted row in the before field, and an after field that is null. The tombstone message contains same key as the delete message, but the entire message value is null, and Kafka’s log compaction utilizes this to know that it can remove any earlier messages with the same key. A number of sink connectors, including the Confluent’s JDBC Sink Connector, are not expecting these messages and will instead fail if they see either kind of message. The UnwrapFromEnvelope SMT will by default filter out both delete and tombstone records, though you can change this if you’re using the SMT and want to keep one or both of these kinds of messages.

Topic naming

Last but not least there is a difference in naming of topics. Debezium uses fully qualified naming for target topics representing each table it manages. The naming follows the pattern <logical-name>.<database-name>.<table-name>. Kafka Connect JDBC Connector works with simple names <table-name>.

In more complex scenarios the user may deploy the Kafka Streams framework to establish elaborated routing between source and target routes. In our example we will use a stock RegexRouter SMT that would route records created by Debezium into topics named according to JDBC Connector schema. Again, we could use this SMT in either the source or sink connectors, but for this example we’re going to use it in the source connector so we can choose the names of the Kafka topics where the records will be written.

"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"

Example

Kick the tires and let’s try our example!

First of all we need to deploy all components.

export DEBEZIUM_VERSION=0.6
docker-compose up

When all components are started we are going to register the JDBC Sink connector writing into PostgreSQL database:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json

Using this registration request:

{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "customers",
        "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
        "transforms": "unwrap",                                                  (1)
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",   (1)
        "auto.create": "true",                                                   (2)
        "insert.mode": "upsert",                                                 (3)
        "pk.fields": "id",                                                       (4)
        "pk.mode": "record_value"                                                (4)
    }
}

The request configures these options:

  1. unwrapping Debezium’s complex format into a simple one

  2. automatically create target tables

  3. insert a row if it does not exist or update an existing one

  4. identify the primary key stored in Kafka’s record value field

Then the source connector must be set up:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

Using this registration request:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",                                         (1)
        "database.whitelist": "inventory",                                           (2)
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "transforms": "route",                                                       (3)
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",  (3)
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",                     (3)
        "transforms.route.replacement": "$3"                                         (3)
    }
}

The request configures these options:

  1. logical name of the database

  2. the database we want to monitor

  3. an SMT which defines a regular expression matching the topic name <logical-name>.<database-name>.<table-name> and extracts the third part of it as the final topic name

Let’s check if the databases are synchronized. All the rows of the customers table should be found in the source database (MySQL) as well as the target database (Postgres):

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+

docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | sally.thomas@acme.com
 Bailey    | 1002 | George     | gbailey@foobar.com
 Walker    | 1003 | Edward     | ed@walker.com
 Kretchmar | 1004 | Anne       | annek@noanswer.org

With the connectors still running, we can add a new row to the MySQL database and then check that it was replicated into the PostgreSQL database:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'
mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)

docker-compose exec -postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
...
Doe        | 1005 | John       | john.doe@example.com
(5 rows)

Summary

We set up a simple streaming data pipeline to replicate data in near real-time from a MySQL database to a PostgreSQL database. We accomplished this using Kafka Connect, the Debezium MySQL source connector, the Confluent JDBC sink connector, and a few SMTs — all without having to write any code. And since it is a streaming system, it will continue to capture all changes made to the MySQL database and replicating them in near real time.

What’s next?

In a future blog post we will reproduce the same scenario with Elasticsearch as a target for events.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


back to top