Subscribe


Querying Debezium Change Data Events With KSQL

Last year we have seen the inception of a new open-source project in the Apache Kafka universe, KSQL, which is a streaming SQL engine build on top of Kafka Streams. In this post, we are going to try out KSQL querying with data change events generated by Debezium from a MySQL database.

As a source of data we will use the database and setup from our tutorial. The result of this exercise should be similar to the recent post about aggregation of events into domain driven aggregates.

Entity diagram

First let’s look at the entities in the database and the relations between them.

Entity diagram
Figure 1: Entity diagram of the example entities

 

The picture above shows the full ER diagram for the inventory database in the example MySQL instance. We are going to focus on two entities:

  • customers - the list of customers in the system

  • orders - the list of orders in the system

There is a 1:n relation between customers and orders, modelled by the purchaser column in the orders table, which is a foreign key to the customers table.

Configuration

We are going to use a Docker Compose file for the deployment of the environment. The deployment consists of the following Docker images:

We also need the KSQL client. To make things simple we are going to use a pre-built Docker image but you can download and directly use the client from the KSQL download page.

Example

First we need to start the Debezium and Kafka infrastructure. To do so, clone the debezium-examples GitHub repository and start the required components using the provided Compose file:

export DEBEZIUM_VERSION=0.7
git clone https://github.com/debezium/debezium-examples.git
cd debezium-examples/tutorial/
docker-compose -f docker-compose-mysql.yaml up

Next we must register an instance of the Debezium MySQL connector to listen to changes in the database:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @- <<-EOF
{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184055",
        "database.server.name": "dbserver",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
    }
}
EOF

Now we should have all components up and running and initial data change events are already streamed into Kafka topics. There are multiple properties that are especially important for our use case:

  • The UnwrapFromEnvelope SMT is used. This allows us to directly map fields from the after part of change records into KSQL statements. Without it, we would need to use EXTRACTJSONFIELD for each field to be extracted from the after part of messages.

  • Schemas are disabled for the JSON converter. The reason is the same as above. With schemas enabled, for JSON the record is encapsulated in a JSON structure that contains the fields schema (with schema information) and payload (with the actual data itself). We would again need to use EXTRACTJSONFIELD to get to the relevant fields. There is no such issue with Avro converter so this option does not need to be set when Avro is used.

Next we are going to start the KSQL command shell. We will run a local engine in the CLI. Also please note --net parameter. This guarantees that KSQL container runs in the same network as Debezium containers and allows proper DNS resolution.

docker run -it --net tutorial_default confluentinc/ksql-cli ksql-cli local --bootstrap-server kafka:9092

First we will list all Kafka topics that exist in the broker:

ksql> LIST TOPICS;

 Kafka Topic                         | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
 connect-status                      | false      | 5          | 1
 dbserver                            | false      | 1          | 1
 dbserver.inventory.addresses        | false      | 1          | 1
 dbserver.inventory.customers        | false      | 1          | 1
 dbserver.inventory.orders           | false      | 1          | 1
 dbserver.inventory.products         | false      | 1          | 1
 dbserver.inventory.products_on_hand | false      | 1          | 1
 ksql__commands                      | true       | 1          | 1
 my_connect_configs                  | false      | 1          | 1
 my_connect_offsets                  | false      | 25         | 1
 schema-changes.inventory            | false      | 1          | 1

The topics we are interested in are dbserver.inventory.orders and dbserver.inventory.customers.

KSQL processing by default starts with latest offsets. We want to process the events already in the topics so we switch processing from earliest offsets.

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

First we need to create streams from the topics containing the Debezium data change events. A stream in KSQL and Kafka Streams terminology is an unbounded incoming data set with no state.

ksql> CREATE STREAM orders_from_debezium (order_number integer, order_date string, purchaser integer, quantity integer, product_id integer) WITH (KAFKA_TOPIC='dbserver.inventory.orders',VALUE_FORMAT='json');

 Message
----------------
 Stream created
ksql>
ksql> CREATE STREAM customers_from_debezium (id integer, first_name string, last_name string, email string) WITH (KAFKA_TOPIC='dbserver.inventory.customers',VALUE_FORMAT='json');

 Message
----------------
 Stream created

Partitioning

Our deployment uses only one partition per topic. In a production system there will likely be multiple partitions per topic and we need to ensure that all events belonging to our aggregated object end up in the same partition. The natural partioning in our case is per customer id. We are going to repartition the orders_from_debezium stream according to the purchaser field that contains the customer id. The repartitioned data are written into a new topic ORDERS_REPART:

ksql> CREATE STREAM orders WITH (KAFKA_TOPIC='ORDERS_REPART',VALUE_FORMAT='json',PARTITIONS=1) as SELECT * FROM orders_from_debezium PARTITION BY PURCHASER;

 Message
----------------------------
 Stream created and running
ksql> LIST TOPICS;

 Kafka Topic                         | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
...
 ORDERS_REPART                       | true       | 1          | 1
...

We are going to execute the same operation for customers too. It is necessary for two reasons:

  • The current key is a struct that contains a field named id with the customer id. This is different from the repartitioned order topic which contains only the id value as the key, so the partitions would not match.

  • When we will create a JOIN later, there is a limitation that requires the key to have the same value as a key field in the table. The table field contains a plain value but the key contains a struct so they would not match. See this KSQL issue for more details.

ksql> CREATE STREAM customers_stream WITH (KAFKA_TOPIC='CUSTOMERS_REPART',VALUE_FORMAT='json',PARTITIONS=1) as SELECT * FROM customers_from_debezium PARTITION BY ID;

 Message
----------------------------
 Stream created and running
ksql> LIST TOPICS;

 Kafka Topic                         | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
...
 CUSTOMERS_REPART                    | true       | 1          | 1
...

To verify that records have a new key and are thus repartioned we can issue few statements to compare the results:

ksql> SELECT * FROM orders_from_debezium LIMIT 1;
1524034842810 | {"order_number":10001} | 10001 | 16816 | 1001 | 1 | 102
LIMIT reached for the partition.
Query terminated
ksql> SELECT * FROM orders LIMIT 1;
1524034842810 | 1001 | 10001 | 16816 | 1001 | 1 | 102
LIMIT reached for the partition.
Query terminated

The second column contains ROWKEY which is the key of the message.

Customer/order join

So far we were only declaring streams as an unbounded stateless data set. In our use case the order is really an event that comes and goes. But customer is an entity that can be updated and generally is a part of a state fo the system. Such quality is represented in KSQL or Kafka Streams as table. We are going to create a table of customers from the topic containing repartitioned customers.

ksql> CREATE TABLE customers (id integer, first_name string, last_name string, email string) WITH (KAFKA_TOPIC='CUSTOMERS_REPART',VALUE_FORMAT='json',KEY='id');

 Message
---------------
 Table created

Now we have everything in place to make a join between customer and its orders and create a query that will monitor incoming orders and list them with associated customer fields.

ksql> SELECT order_number,quantity,customers.first_name,customers.last_name FROM orders left join customers on orders.purchaser=customers.id;
10001 | 1 | Sally | Thomas
10002 | 2 | George | Bailey
10003 | 2 | George | Bailey
10004 | 1 | Edward | Walker

Let’s apply a few changes to the database, which will result in corresponding CDC events being emitted by Debezium:

docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

mysql> INSERT INTO orders VALUES(default,NOW(), 1003,5,101);
Query OK, 1 row affected, 1 warning (0.02 sec)

mysql> UPDATE customers SET first_name='Annie' WHERE id=1004;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> UPDATE orders SET quantity=20 WHERE order_number=10004;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

You may notice that only changes in the orders table have triggered changes in the joined stream. This is a product of the stream/table join. We would need a stream/stream join to trigger changes if any of input streams is modified.

So the final result of the select after the database is modified is

10001 | 1 | Sally | Thomas
10002 | 2 | George | Bailey
10003 | 2 | George | Bailey
10004 | 1 | Edward | Walker
10005 | 5 | Edward | Walker
10004 | 20 | Edward | Walker

Summary

We have successfully started a KSQL instance. We have mapped KSQL streams to Debezium topics filled by Debezium and made a join between them. We have also discussed the problem of repartioning in streaming applications.

If you’d like to try out this example with Avro encoding and schema registry then you can use our Avro example. Also for further details and more advanced usages just refer to the KSQL syntax reference.

In case you need help, have feature requests or would like to share your experiences with this example, please let us know in the comments below.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium 0.7.5 Is Released

It’s my pleasure to announce the release of Debezium 0.7.5!

This is a bugfix release to the 0.7 release line, which we decided to do while working towards Debezium 0.8. Most notably it fixes an unfortunate bug introduced in 0.7.3 (DBZ-663), where the internal database history topic of the Debezium MySQL connector could be partly deleted under some specific conditions. Please see the dedicated blog post on this issue to find out whether this affects you and what you should do to prevent this issue.

Together with this, we released a couple of other fixes and improvements. Thanks to Maciej Brynski, the performance of the logical table routing SMT has been improved significantly (DBZ-655). Another fix contributed by Maciej is for DBZ-646 which lets the MySQL connector handle CREATE TABLE statements for the TokuDB storage engine now.

And we got some more bugfixes by our fantastic community: Long-term community member Peter Goransson fixed an issue about the snapshot JMX metrics of the MySQL connector, which are now also accessible after the snapshot has been completed (DBZ-640). Andrew Tongen spotted and fixed an issue for the Debezium embedded engine (DBZ-665) which caused offsets to be committed more often than needed. And Matthias Wessendorf upgraded the Debezium dependencies and Docker images to Apache Kafka 1.0.1 (DBZ-647).

Thank you all for your help!

Please refer to the change log for the complete list of changes in Debezium 0.7.5.

What’s next?

Please see the previous release announcement for the next planned features. Due to the unplanned 0.7.5 release, though, the schedule of the next one will likely be extended a little bit.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


A Note On Database History Topic Configuration

A user of the Debezium connector for MySQL informed us about a potential issue with the configuration of the connector’s internal database history topic, which may cause the deletion of parts of that topic (DBZ-663). Please continue reading if you’re using the Debezium MySQL connector in versions 0.7.3 or 0.7.4.

What is the issue about?

In Debezium 0.7.3 we rolled out a feature for creating the database history automatically if it doesn’t exist yet (DBZ-278). While this feature sets the retention time for the topic to an "infinite" period, it doesn’t specify the "retention.bytes" option for the history topic. This may cause parts of the history topic to be deleted in case all of the following conditions are met:

  • You are using versions 0.7.3 or 0.7.4 of the Debezium connector for MySQL

  • The database history topic has been created by the connector (i.e. you haven’t created it yourself)

  • The broker level option "log.retention.bytes" is set to another value than -1 (note that the default is -1, in which case things work as intended)

  • The database history topic grows beyond the threshold configured via "log.retention.bytes"

If the history topic is incomplete, the connector will fail to recover the database history after a restart of the connector and will not continue with reading the MySQL binlog.

How to prevent the issue?

You should either create the database history topic yourself with an infinite retention or alternatively override the "retention.bytes" configuration for the history topic created by the connector:

<KAFKA_DIR>/bin/kafka-configs.sh \
  --zookeeper zookeeper:2181 \
  --entity-type topics \
  --entity-name <DB_HISTORY_TOPIC> \
  --alter \
  --add-config retention.bytes=-1

In case parts of the history topic were removed already, you can use the snapshot mode schema_only_recovery for re-creating the history topic in case no schema changes have happened since the last committed offset of the connector. Alternatively, a complete new snapshot should be taken, e.g. by setting up a new connector instance.

Next steps

We’ll release Debezium 0.7.5 with a fix for this issue early next week. Note that previously created database history topics should be re-configured as described above. Please don’t hesitate to get in touch in the comments below, the chat room or the mailing list in case you have any further questions on this issue.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium 0.7.4 Is Released

It’s my pleasure to announce the release of Debezium 0.7.4!

Continuing the 0.7 release line, this new version brings several bug fixes and a handful of new features. We recommend this upgrade to all users. When upgrading from earlier versions, please check out the release notes of all versions between the one you’re currently on and 0.7.4 in order to learn about any steps potentially required for upgrading.

New features

In terms of new features, there’s a new mode for handling decimal columns in Postgres and MySQL (DBZ-611). By setting the decimal.handling.mode connector option to string, Debezium will emit decimal and numeric columns as Strings. That oftentimes is easier to handle for consumers than the byte-array based representation used by default, while keeping the full precision. As a bonus, string also allows to convey the special numeric values NaN and Infinity as supported by Postgres. Note that this functionality required an update to Debezium’s logical decoding plug-in which runs within the Postgres database server. This plug-in must be upgraded to the new version before upgrading the Debezium Postgres connector.

Speaking of byte arrays, the BYTEA column type in Postgres is now also supported (DBZ-605).

For the MySQL connector, there’s a new option to the snapshotting routine: snapshot.locking.mode (DBZ-602). By setting this to NONE, this option allows to skip any table locks during snapshotting. This should be used if and only if you’re absolutely sure that the tables don’t undergo structural changes (columns added, removed etc.) while the snapshot is taken. But if that’s guaranteed, the new mode can be a useful tool for increasing overall system performance, as writes by concurrent processes won’t be blocked. That’s especially useful on environments such as Amazon RDS, where the connector otherwise would be required to keep a lock for the entirety of the snapshot. The new option supersedes the existing snapshot.minimal.locks option. Please see the connector documentation for the details. This feature was contributed by our community member Stephen Powis; many thanks to you!

Bug Fixes

0.7.4 brings multiple fixes related to how numeric columns are handled. E.g. columns without scale couldn’t correctly be processed by the MySQL connector during binlog reading (DBZ-615). That’s fixed now. And when using the Postgres connector, arbitrary precision column values are correctly converted into change data message fields now (DBZ-351).

We also noticed a regression introduced in Debezium 0.6: the field schema for NUMERIC columns was always marked as optional, also if that column was actually declared as NOT NULL. The same affected geo-spatial array types on Postgres as supported as of Debezium 0.7. This has been fixed with DBZ-635. We don’t expect any impact on consumers by this change (just as before, they’ll always get a value for such field, only its schema won’t be incorrectly marked as optional any more).

Please see the full change log for more details and the complete list of issues fixed in Debezium 0.7.4.

What’s next?

Following our three weeks release cadence, the next Debezium release is planned for March 28th. We got some exciting changes in the works for that: if things go as planned, we’ll release the first version of our Oracle connector (DBZ-20). This will be based on the Oracle XStream API in the first iteration and not support snapshots yet. But we felt it’d make sense to roll out this connector incrementally, so to get out the new feature early on and collect feedback on it. We’ve also planned to explore alternatives to using the XStream API in future releases.

Another great new feature will be Reactive Streams support (DBZ-566). Based on top of the existing embedded mode, this will make it very easy to consume change data events using Reactive Streams implementations such as RxJava 2, the Java 9 Flow API and many more. It’ll also be very useful to consume change events in reactive frameworks such as Vert.x. We’re really looking forward to shipping this feature and already have a pending pull request for it. If you like, take a look and let us know about your feedback!

Please also check out our roadmap for the coming months of Debezium’s development. This is our current plan for the things we’ll work on, but it’s not cast in stone, so please tell us about your feature requests by sending a message to our Google group. We’re looking forward to your feedback!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium 0.7.3 Is Released

I’m very happy to announce the release of Debezium 0.7.3!

This is primarily a bugfix release, but we’ve also added a handful of smaller new features. It’s a recommended upgrade for all users. When upgrading from earlier versions, please check out the release notes of all versions between the one your’re currently on and 0.7.3 in order to learn about any steps potentially required for upgrading.

Let’s take a closer look at some of the new features.

All Connectors

Using the new connector option tombstones.on.delete you can now control whether upon record deletions a tombstone event should be emitted or not (DBZ-582). Doing so is usually the right thing and thus remains the default behaviour. But disabling tombstones may be desirable in certain situations, and this gets a bit easier now using that option (before you’d have to use an SMT - single message transform -, which for instance isn’t supported when using Debezium’s embedded mode). This feature was contributed by our community member Raf Liwoch. Thanks!

We’ve also spent some time on a few operational aspects: The sourceInfo element of Debezium’s change data messages contains a new field representing the version of the connector that created the message (DBZ-593). This lets message consumers take specific action based on the version. For instance this can be helpful where a new Debezium release fixes a bug, which consumers could work around so far. Now, after the update to that new Debezium version, that workaround should not be applied anymore. The version field will allow consumers to decide whether to apply the workaround or not.

The names of all the threads managed by Debezium are now structured in the form of "debezium-<connector>-…​" (DBZ-587). This helps with identifying Debezium’s threads when analyzing thread dumps for instance.

Postgres Connector

Here we’ve focused on improving the support for array types: besides fixing a bug related to numeric arrays (DBZ-577) we’ve also completed the support for the PostGIS types (which was introduced in 0.7.2), allowing you to capture array columns of types GEOMETRY and GEOGRAPHY.

Snapshots are now correctly interruptable (DBZ-586) and the connector will correctly handle the case where after a restart it should continue from a WAL position which isn’t available any more: it’ll stop, requiring you to do a new snapshot (DBZ-590).

MySQL Connector

The MySQL connector can create the DB history topic automatically, if needed (DBZ-278). This means you don’t have to create that topic yourself and you also don’t need to rely on Kafka’s automatic topic creation any longer (any change data topics will automatically be created by Kafka Connect).

Also the connector can optionally emit messages to a dedicated heartbeat topic in a configurable interval (DBZ-220). This comes in handy in situations where you only want to capture tables with low traffic, while other tables in the database are changed more frequently. In that case, no messages would have been emitted to Kafka Connect for a long time, and thus no offset would have been committed either. This could have caused trouble when restarting the connector: it wanted to resume from the last comitted offset, which may not be available in the binlogs any longer. But as the captured tables didn’t change, it actually wouldn’t be necessary to resume from such old binlog position. This all is avoided by emitting messages to the heartbeat topic regularly, which causes the last offset the connector has seen to be committed.

We’ll roll out this change to the other connectors, too, in future releases.

What’s next?

Please see the full change log for more details and the complete list of issues fixed in Debezium 0.7.3.

The next release is scheduled for March 7th. We’ll still have to decide whether that will be 0.7.4 or 0.8.0, depending on how far we are by then with our work on the Oracle connector (DBZ-137).

Please also our roadmap describing our ideas for future development of Debezium. This is our current thinking of the things we’d like to tackle in the coming months, but it’s not cast in stone, so please let us know about your feature requests by sending a message to our Google group. We’re looking forward to your feedback!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


back to top