Last updated at Nov 21st 2018 (adjusted to new KSQL Docker images).
Last year we have seen the inception of a new open-source project in the Apache Kafka universe, KSQL, which is a streaming SQL engine build on top of Kafka Streams. In this post, we are going to try out KSQL querying with data change events generated by Debezium from a MySQL database.
As a source of data we will use the database and setup from our tutorial. The result of this exercise should be similar to the recent post about aggregation of events into domain driven aggregates.
Entity diagram
First let’s look at the entities in the database and the relations between them.
Figure 1: Entity diagram of the example entities
The picture above shows the full ER diagram for the inventory database in the example MySQL instance. We are going to focus on two entities:
-
customers
- the list of customers in the system -
orders
- the list of orders in the system
There is a 1:n
relation between customers
and orders
, modelled by the purchaser
column in the orders
table, which is a foreign key to the customers
table.
Configuration
We are going to use a Docker Compose file for the deployment of the environment. The deployment consists of the following Docker images:
-
Kafka Connect including the Debezium connectors image
-
A pre-populated MySQL database as used in our tutorial
-
The KSQL server and CLI client
Example
First we need to start the Debezium and Kafka infrastructure. To do so, clone the debezium-examples GitHub repository and start the required components using the provided Compose file:
export DEBEZIUM_VERSION=0.8
git clone https://github.com/debezium/debezium-examples.git
cd debezium-examples/ksql/
docker-compose up
Next we must register an instance of the Debezium MySQL connector to listen to changes in the database:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @- <<-EOF
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184055",
"database.server.name": "dbserver",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
EOF
Now we should have all components up and running and initial data change events are already streamed into Kafka topics. There are multiple properties that are especially important for our use case:
-
The UnwrapFromEnvelope SMT is used. This allows us to directly map fields from the
after
part of change records into KSQL statements. Without it, we would need to useEXTRACTJSONFIELD
for each field to be extracted from theafter
part of messages. -
Schemas are disabled for the JSON converter. The reason is the same as above. With schemas enabled, for JSON the record is encapsulated in a JSON structure that contains the fields
schema
(with schema information) andpayload
(with the actual data itself). We would again need to useEXTRACTJSONFIELD
to get to the relevant fields. There is no such issue with Avro converter so this option does not need to be set when Avro is used.
Next we are going to start the KSQL command shell. We will run a local engine in the CLI. Also please note --net
parameter. This guarantees that KSQL container runs in the same network as Debezium containers and allows proper DNS resolution.
docker-compose exec ksql-cli ksql http://ksql-server:8088
First we will list all Kafka topics that exist in the broker:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
connect-status | false | 5 | 1
dbserver | false | 1 | 1
dbserver.inventory.addresses | false | 1 | 1
dbserver.inventory.customers | false | 1 | 1
dbserver.inventory.orders | false | 1 | 1
dbserver.inventory.products | false | 1 | 1
dbserver.inventory.products_on_hand | false | 1 | 1
ksql__commands | true | 1 | 1
my_connect_configs | false | 1 | 1
my_connect_offsets | false | 25 | 1
schema-changes.inventory | false | 1 | 1
The topics we are interested in are dbserver.inventory.orders
and dbserver.inventory.customers
.
KSQL processing by default starts with latest
offsets. We want to process the events already in the topics so we switch processing from earliest
offsets.
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
First we need to create streams from the topics containing the Debezium data change events. A stream in KSQL and Kafka Streams terminology is an unbounded incoming data set with no state.
ksql> CREATE STREAM orders_from_debezium (order_number integer, order_date string, purchaser integer, quantity integer, product_id integer) WITH (KAFKA_TOPIC='dbserver.inventory.orders',VALUE_FORMAT='json');
Message
----------------
Stream created
ksql>
ksql> CREATE STREAM customers_from_debezium (id integer, first_name string, last_name string, email string) WITH (KAFKA_TOPIC='dbserver.inventory.customers',VALUE_FORMAT='json');
Message
----------------
Stream created
Partitioning
Our deployment uses only one partition per topic. In a production system there will likely be multiple partitions per topic and we need to ensure that all events belonging to our aggregated object end up in the same partition. The natural partioning in our case is per customer id. We are going to repartition the orders_from_debezium
stream according to the purchaser
field that contains the customer id. The repartitioned data are written into a new topic ORDERS_REPART
:
ksql> CREATE STREAM orders WITH (KAFKA_TOPIC='ORDERS_REPART',VALUE_FORMAT='json',PARTITIONS=1) as SELECT * FROM orders_from_debezium PARTITION BY PURCHASER;
Message
----------------------------
Stream created and running
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
...
ORDERS_REPART | true | 1 | 1
...
We are going to execute the same operation for customers too. It is necessary for two reasons:
-
The current key is a struct that contains a field named
id
with the customer id. This is different from the repartitioned order topic which contains only theid
value as the key, so the partitions would not match. -
When we will create a JOIN later, there is a limitation that requires the key to have the same value as a key field in the table. The table field contains a plain value but the key contains a struct so they would not match. See this KSQL issue for more details.
ksql> CREATE STREAM customers_stream WITH (KAFKA_TOPIC='CUSTOMERS_REPART',VALUE_FORMAT='json',PARTITIONS=1) as SELECT * FROM customers_from_debezium PARTITION BY ID;
Message
----------------------------
Stream created and running
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
...
CUSTOMERS_REPART | true | 1 | 1
...
To verify that records have a new key and are thus repartioned we can issue few statements to compare the results:
ksql> SELECT * FROM orders_from_debezium LIMIT 1;
1524034842810 | {"order_number":10001} | 10001 | 16816 | 1001 | 1 | 102
LIMIT reached for the partition.
Query terminated
ksql> SELECT * FROM orders LIMIT 1;
1524034842810 | 1001 | 10001 | 16816 | 1001 | 1 | 102
LIMIT reached for the partition.
Query terminated
The second column contains ROWKEY
which is the key of the message.
Customer/order join
So far we were only declaring streams as an unbounded stateless data set. In our use case the order
is really an event that comes and goes. But customer
is an entity that can be updated and generally is a part of a state fo the system. Such quality is represented in KSQL or Kafka Streams as table. We are going to create a table of customers from the topic containing repartitioned customers.
ksql> CREATE TABLE customers (id integer, first_name string, last_name string, email string) WITH (KAFKA_TOPIC='CUSTOMERS_REPART',VALUE_FORMAT='json',KEY='id');
Message
---------------
Table created
Now we have everything in place to make a join between customer and its orders and create a query that will monitor incoming orders and list them with associated customer fields.
ksql> SELECT order_number,quantity,customers.first_name,customers.last_name FROM orders left join customers on orders.purchaser=customers.id;
10001 | 1 | Sally | Thomas
10002 | 2 | George | Bailey
10003 | 2 | George | Bailey
10004 | 1 | Edward | Walker
Let’s apply a few changes to the database, which will result in corresponding CDC events being emitted by Debezium:
docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> INSERT INTO orders VALUES(default,NOW(), 1003,5,101);
Query OK, 1 row affected, 1 warning (0.02 sec)
mysql> UPDATE customers SET first_name='Annie' WHERE id=1004;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql> UPDATE orders SET quantity=20 WHERE order_number=10004;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1 Changed: 1 Warnings: 0
You may notice that only changes in the orders
table have triggered changes in the joined stream. This is a product of the stream/table join. We would need a stream/stream join to trigger changes if any of input streams is modified.
So the final result of the select after the database is modified is
10001 | 1 | Sally | Thomas
10002 | 2 | George | Bailey
10003 | 2 | George | Bailey
10004 | 1 | Edward | Walker
10005 | 5 | Edward | Walker
10004 | 20 | Edward | Walker
Summary
We have successfully started a KSQL instance. We have mapped KSQL streams to Debezium topics filled by Debezium and made a join between them. We have also discussed the problem of repartioning in streaming applications.
If you’d like to try out this example with Avro encoding and schema registry then you can use our Avro example. Also for further details and more advanced usages just refer to the KSQL syntax reference.
In case you need help, have feature requests or would like to share your experiences with this example, please let us know in the comments below.
About Debezium
Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.
Get involved
We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.