Subscribe


Streaming Data Changes from Your Database to Elasticsearch

We wish all the best to the Debezium community for 2018!

While we’re working on the 0.7.2 release, we thought we’d publish another post describing an end-to-end data streaming use case based on Debezium. We have seen how to set up a change data stream to a downstream database a few weeks ago. In this blog post we will follow the same approach to stream the data to an Elasticsearch server to leverage its excellent capabilities for full-text search on our data. But to make the matter a little bit more interesting, we will stream the data to both, a PostgreSQL database and Elasticsearch, so we will optimize access to the data via the SQL query language as well as via full-text search.

Topology

Here’s a diagram that shows how the data is flowing through our distributed system. First, the Debezium MySQL connector is continuously capturing the changes from the MySQL database, and sending the changes for each table to separate Kafka topics. Then, the Confluent JDBC sink connector is continuously reading those topics and writing the events into the PostgreSQL database. And, at the same time, the Confluent Elasticsearch connector is continuously reading those same topics and writing the events into Elasticsearch.

 

Scenario topology
Figure 1: A general topology

 

We are going to deploy these components into several different processes. In this example, we’ll deploy all three connectors to a single Kafka Connect instance that will write to and read from Kafka on behalf of all of the connectors (in production you might need to keep the connectors separated to achieve better performance).

 

Scenario topology
Figure 2: A simplified topology

Configuration

We will use this Docker Compose file for a fast deployment of the demo. The deployment consists of the following Docker images:

  • Apache ZooKeeper

  • Apache Kafka

  • An enriched Kafka Connect / Debezium image with a few changes:

    • PostgreSQL JDBC driver placed into /kafka/libs directory

    • The Confluent JDBC connector placed into /kafka/connect/kafka-connect-jdbc directory

  • Pre-populated MySQL as used in our tutorial

  • Empty PostgreSQL

  • Empty Elasticsearch

The message format is not the same for the Debezium source connector and the JDBC and Elasticsearch connectors as they are developed separately and each focuses on slightly different objectives. Debezium emits a more complex event structure so that it captures all of the information available. In particular, the change events contain the old and the new state of a changed record. Both sink connectors on the other hand expect a simple message that just represents the record state to be written.

Debezium’s UnwrapFromEnvelope single message transformation (SMT) collapses the complex change event structure into the same row-based format expected by the two sink connectors and effectively acts as a message translator between the two aforementioned formats.

Example

Let’s move directly to our example as that’s where the changes are visible. First of all we need to deploy all components:

export DEBEZIUM_VERSION=0.7
docker-compose up

When all components are started we are going to register the Elasticsearch Sink connector writing into the Elasticsearch instance. We want to use the same key (primary id) in the source and both PostgreSQL and Elasticsearch:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @es-sink.json

We’re using this registration request:

{
  {
    "name": "elastic-sink",
    "config": {
      "connector.class":
          "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "tasks.max": "1",
      "topics": "customers",
      "connection.url": "http://elastic:9200",
      "transforms": "unwrap,key",
      "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",        (1)
      "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",(2)
      "transforms.key.field": "id",                                                 (2)
      "key.ignore": "false",                                                        (3)
      "type.name": "customer"                                                       (4)
    }
  }
}

The request configures these options:

  1. extracting only the new row’s state from Debezium’s change data message

  2. extracting the id field from the key struct, then the same key is used for the source and both destinations. This is to address the fact that the Elasticsearch connector only supports numeric types and string as keys. If we do not extract the id the messages will be filtered out by the connector because of unknown key type.

  3. use key from the event instead of generating a synthetic one

  4. type under which the events will be registered in Elasticsearch

Next we are going to register the JDBC Sink connector writing into PostgreSQL database:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @jdbc-sink.json

Finally, the source connector must be set up:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @source.json

Let’s check if the databases and the search server are synchronized. All the rows of the customers table should be found in the source database (MySQL) as well as the target database (Postgres) and Elasticsearch:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | sally.thomas@acme.com
 Bailey    | 1002 | George     | gbailey@foobar.com
 Walker    | 1003 | Edward     | ed@walker.com
 Kretchmar | 1004 | Anne       | annek@noanswer.org
curl 'http://localhost:9200/customers/_search?pretty'
{
  "took" : 42,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 4,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1001",
        "_score" : 1.0,
        "_source" : {
          "id" : 1001,
          "first_name" : "Sally",
          "last_name" : "Thomas",
          "email" : "sally.thomas@acme.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1004",
        "_score" : 1.0,
        "_source" : {
          "id" : 1004,
          "first_name" : "Anne",
          "last_name" : "Kretchmar",
          "email" : "annek@noanswer.org"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1002",
        "_score" : 1.0,
        "_source" : {
          "id" : 1002,
          "first_name" : "George",
          "last_name" : "Bailey",
          "email" : "gbailey@foobar.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1003",
        "_score" : 1.0,
        "_source" : {
          "id" : 1003,
          "first_name" : "Edward",
          "last_name" : "Walker",
          "email" : "ed@walker.com"
        }
      }
    ]
  }
}

With the connectors still running, we can add a new row to the MySQL database and then check that it was replicated into both the PostgreSQL database and Elasticsearch:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'

mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)
docker-compose exec -postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
...
Doe        | 1005 | John       | john.doe@example.com
(5 rows)
curl 'http://localhost:9200/customers/_search?pretty'
...
{
  "_index" : "customers",
  "_type" : "customer",
  "_id" : "1005",
  "_score" : 1.0,
  "_source" : {
    "id" : 1005,
    "first_name" : "John",
    "last_name" : "Doe",
    "email" : "john.doe@example.com"
  }
}
...

Summary

We set up a complex streaming data pipeline to synchronize a MySQL database with another database and also with an Elasticsearch instance. We managed to keep the same identifier across all systems which allows us to correlate records across the system as a whole.

Propagating data changes from a primary database in near realtime to a search engine such as Elasticsearch enables many interesting use cases. Besides different applications of fulltext search one could for instance also think about creating dashboards and all kinds of visualizations using Kibana, to gain further insight into the data.

If you’d like to try out this set-up yourself, just clone the project from our examples repo. In case you need help, have feature requests or would like to share your experiences with this pipeline, please let us know in the comments below.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Streaming data to a downstream database

In this blog post we will create a simple streaming data pipeline to continuously capture the changes in a MySQL database and replicate them in near real-time into a PostgreSQL database. We’ll show how to do this without writing any code, but instead by using and configuring Kafka Connect, the Debezium MySQL source connector, the Confluent JDBC sink connector, and a few single message transforms (SMTs).

This approach of replicating data through Kafka is really useful on its own, but it becomes even more advantageous when we can combine our near real-time streams of data changes with other streams, connectors, and stream processing applications. A recent Confluent blog post series shows a similar streaming data pipeline but using different connectors and SMTs. What’s great about Kafka Connect is that you can mix and match connectors to move data between multiple systems.

We will also demonstrate a new functionality that was released with Debezium 0.6.0: a single message transform for CDC Event Flattening.

Topology

The general topology for this scenario is displayed on the following picture:

Scenario topology
Figure 1: A General topology

 

To simplify the setup a little bit, we will use only one Kafka Connect instance that will contain all connectors. I.e. this instance will serve as an event producer and an event consumer:

 

Scenario topology
Figure 2: A Simplified topology

Configuration

We will use this compose for a fast deployment of the demo. The deployment consists of following Docker images:

The Debezium MySQL Connector was designed to specifically capture database changes and provide as much information as possible about those events beyond just the new state of each row. Meanwhile, the Confluent JDBC Sink Connector was designed to simply convert each message into a database insert/upsert based upon the structure of the message. So, the two connectors have different structures for the messages, but they also use different topic naming conventions and behavior of representing deleted records.

These mismatches in structure and behavior will be common when using connectors that were not designed to work together. But this is something that we can easily deal with, and we discuss how in the next few sections.

Event format

Debezium emits events in a complex format that contains all of the information about the captured data change: the type of operation, source metadata, the timestamp the event was processed by the connector, and state of the row before and after the change was made. Debezium calls this structure an "envelope":

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

Many other Kafka Connect source connectors don’t have the luxury of knowing this much about the changes, and instead use a simpler model where each message directly represents the after state of the row. This is also what many sink connectors expect, and the Confluent JDBC Sink Connector is not different:

{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}

While we think it’s actually a great thing that Debezium CDC connectors provide as much detail as possible, we also make it easy for you to transform Debezium’s "envelope" format into the "row" format that is expected by many other connectors. Debezium provides a bridge between those two formats in a form of a single message transform. The UnwrapFromEnvelope transformation automatically extracts a new row record and thus effectively flattens the complex record into a simple one consumable by other connectors.

You can use this SMT on the source connector to transform the message before it is written to Kafka, or you can instead store the source connector’s richer "envelope" form of the message in Kafka and use this SMT on the sink connector to transform the message after it is read from Kafka and before it is passed to the sink connector. Both options work, and it just depends on whether you find the envelope form of the message useful for other purposes.

In our example we apply the SMT at the sink connector using these configuration properties:

"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",

Delete records

When the Debezium connector detects a row is deleted, it creates two event messages: a delete event and a tombstone message. The delete message has an envelope with the state of the deleted row in the before field, and an after field that is null. The tombstone message contains same key as the delete message, but the entire message value is null, and Kafka’s log compaction utilizes this to know that it can remove any earlier messages with the same key. A number of sink connectors, including the Confluent’s JDBC Sink Connector, are not expecting these messages and will instead fail if they see either kind of message. The UnwrapFromEnvelope SMT will by default filter out both delete and tombstone records, though you can change this if you’re using the SMT and want to keep one or both of these kinds of messages.

Topic naming

Last but not least there is a difference in naming of topics. Debezium uses fully qualified naming for target topics representing each table it manages. The naming follows the pattern <logical-name>.<database-name>.<table-name>. Kafka Connect JDBC Connector works with simple names <table-name>.

In more complex scenarios the user may deploy the Kafka Streams framework to establish elaborated routing between source and target routes. In our example we will use a stock RegexRouter SMT that would route records created by Debezium into topics named according to JDBC Connector schema. Again, we could use this SMT in either the source or sink connectors, but for this example we’re going to use it in the source connector so we can choose the names of the Kafka topics where the records will be written.

"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"

Example

Kick the tires and let’s try our example!

First of all we need to deploy all components.

export DEBEZIUM_VERSION=0.6
docker-compose up

When all components are started we are going to register the JDBC Sink connector writing into PostgreSQL database:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json

Using this registration request:

{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "customers",
        "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
        "transforms": "unwrap",                                                  (1)
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",   (1)
        "auto.create": "true",                                                   (2)
        "insert.mode": "upsert",                                                 (3)
        "pk.fields": "id",                                                       (4)
        "pk.mode": "record_value"                                                (4)
    }
}

The request configures these options:

  1. unwrapping Debezium’s complex format into a simple one

  2. automatically create target tables

  3. insert a row if it does not exist or update an existing one

  4. identify the primary key stored in Kafka’s record value field

Then the source connector must be set up:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

Using this registration request:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",                                         (1)
        "database.whitelist": "inventory",                                           (2)
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "transforms": "route",                                                       (3)
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",  (3)
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",                     (3)
        "transforms.route.replacement": "$3"                                         (3)
    }
}

The request configures these options:

  1. logical name of the database

  2. the database we want to monitor

  3. an SMT which defines a regular expression matching the topic name <logical-name>.<database-name>.<table-name> and extracts the third part of it as the final topic name

Let’s check if the databases are synchronized. All the rows of the customers table should be found in the source database (MySQL) as well as the target database (Postgres):

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+

docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | sally.thomas@acme.com
 Bailey    | 1002 | George     | gbailey@foobar.com
 Walker    | 1003 | Edward     | ed@walker.com
 Kretchmar | 1004 | Anne       | annek@noanswer.org

With the connectors still running, we can add a new row to the MySQL database and then check that it was replicated into the PostgreSQL database:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'
mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)

docker-compose exec -postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
...
Doe        | 1005 | John       | john.doe@example.com
(5 rows)

Summary

We set up a simple streaming data pipeline to replicate data in near real-time from a MySQL database to a PostgreSQL database. We accomplished this using Kafka Connect, the Debezium MySQL source connector, the Confluent JDBC sink connector, and a few SMTs — all without having to write any code. And since it is a streaming system, it will continue to capture all changes made to the MySQL database and replicating them in near real time.

What’s next?

In a future blog post we will reproduce the same scenario with Elasticsearch as a target for events.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


back to top