Tutorial
This tutorial demonstrates how to use Debezium to monitor a MySQL database. As the data in the database changes, you will see the resulting event streams.
In this tutorial you will start the Debezium services, run a MySQL database server with a simple example database, and use Debezium to monitor the database for changes.
-
Docker is installed and running.
This tutorial uses Docker and the Debezium container images to run the required services. You should use the latest version of Docker. For more information, see the Docker Engine installation documentation.
|
This example can also be run using Podman. For more information see Podman. |
Introduction to Debezium
Debezium is a distributed platform that converts information from your existing databases into event streams, enabling applications to detect, and immediately respond to row-level changes in the databases.
Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors. Each of the connectors works with a specific database management system (DBMS). Connectors record the history of data changes in the DBMS by detecting changes as they occur, and streaming a record of each change event to a Kafka topic. Consuming applications can then read the resulting event records from the Kafka topic.
By taking advantage of Kafka’s reliable streaming platform, Debezium makes it possible for applications to consume changes that occur in a database correctly and completely. Even if your application stops unexpectedly, or loses its connection, it does not miss events that occur during the outage. After the application restarts, it resumes reading from the topic from the point where it left off.
The tutorial that follows shows you how to deploy and use the Debezium MySQL connector with a simple configuration. For more information about deploying and using Debezium connectors, see the connector documentation.
Starting the services
Using Debezium requires three separate services: ZooKeeper, Kafka, and the Debezium connector service. In this tutorial, you will set up a single instance of each service using Docker and the Debezium container images.
To start the services needed for this tutorial, you must:
Considerations for running Debezium with Docker
This tutorial uses Docker and the Debezium container images to run the ZooKeeper, Kafka, Debezium, and MySQL services. Running each service in a separate container simplifies the setup so that you can see Debezium in action.
|
In a production environment, you would run multiple instances of each service to provide performance, reliability, replication, and fault tolerance. Typically, you would either deploy these services on a platform like OpenShift or Kubernetes that manages multiple Docker containers running on multiple hosts and machines, or you would install on dedicated hardware. |
You should be aware of the following considerations for running Debezium with Docker:
-
The containers for ZooKeeper and Kafka are ephemeral.
ZooKeeper and Kafka would typically store their data locally inside the containers, which would require you to mount directories on the host machine as volumes. That way, when the containers are stopped, the persisted data remains. However, this tutorial skips this setup - when a container is stopped, all persisted data is lost. This way, cleanup is simple when you complete the tutorial.
For more information about storing persistent data, see the documentation for the container images.
-
This tutorial requires you to run each service in a different container.
To avoid confusion, you will run each container in the foreground in a separate terminal. This way, all of the output of a container will be displayed in the terminal used to run it.
Docker also allows you to run a container in detached mode (with the
-doption), where the container is started and thedockercommand returns immediately. However, detached mode containers do not display their output in the terminal. To see the output, you would need to use thedocker logs --follow --name <container-name>command. For more information, see the Docker documentation.
Starting Zookeeper
ZooKeeper is the first service you must start.
-
Open a terminal and use it to start ZooKeeper in a container.
This command runs a new container using version 1.9 of the
quay.io/debezium/zookeeperimage:$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:1.9-it-
The container is interactive, which means the terminal’s standard input and output are attached to the container.
--rm-
The container will be removed when it is stopped.
--name zookeeper-
The name of the container.
-p 2181:2181 -p 2888:2888 -p 3888:3888-
Maps three of the container’s ports to the same ports on the Docker host. This enables other containers (and applications outside of the container) to communicate with ZooKeeper.
|
If you use Podman, run the following command:s
|
-
Verify that ZooKeeper started and is listening on port
2181.You should see output similar to the following:
Starting up in standalone mode ZooKeeper JMX enabled by default Using config: /zookeeper/conf/zoo.cfg 2017-09-21 07:15:55,417 - INFO [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg 2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3 2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1 ... port 0.0.0.0/0.0.0.0:2181 (1)1 This line indicates that ZooKeeper is ready and listening on port 2181. The terminal will continue to show additional output as ZooKeeper generates it.
Starting Kafka
After starting ZooKeeper, you can start Kafka in a new container.
|
Debezium 1.9.8.Final has been tested against multiple versions of Kafka Connect. Please refer to the Debezium Test Matrix to determine compatibility between Debezium and Kafka Connect. |
-
Open a new terminal and use it to start Kafka in a container.
This command runs a new container using version 1.9 of the
quay.io/debezium/kafkaimage:$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:1.9-it-
The container is interactive, which means the terminal’s standard input and output are attached to the container.
--rm-
The container will be removed when it is stopped.
--name kafka-
The name of the container.
-p 9092:9092-
Maps port
9092in the container to the same port on the Docker host so that applications outside of the container can communicate with Kafka. --link zookeeper:zookeeper-
Tells the container that it can find ZooKeeper in the
zookeepercontainer, which is running on the same Docker host.
If you use Podman, run the following command:
$ sudo podman run -it --rm --name kafka --pod dbz quay.io/debezium/kafka:1.9In this tutorial, you will always connect to Kafka from within a Docker container. Any of these containers can communicate with the
kafkacontainer by linking to it. If you needed to connect to Kafka from outside of a Docker container, you would have to set the-eoption to advertise the Kafka address through the Docker host (-e ADVERTISED_HOST_NAME=followed by either the IP address or resolvable host name of the Docker host). -
Verify that Kafka started.
You should see output similar to the following:
... 2017-09-21 07:16:59,085 - INFO [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected) 2017-09-21 07:16:59,218 - INFO [main:Logging$class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA ... 2017-09-21 07:16:59,649 - INFO [main:Logging$class@70] - [Kafka Server 1], started (1)1 The Kafka broker has successfully started and is ready for client connections. The terminal will continue to show additional output as Kafka generates it.
Starting a MySQL database
At this point, you have started ZooKeeper and Kafka, but you still need a database server from which Debezium can capture changes. In this procedure, you will start a MySQL server with an example database.
-
Open a new terminal, and use it to start a new container that runs a MySQL database server preconfigured with an
inventorydatabase.This command runs a new container using version 1.9 of the
quay.io/debezium/example-mysqlimage, which is based on the mysql:8.0 image. It also defines and populates a sampleinventorydatabase:$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9-it-
The container is interactive, which means the terminal’s standard input and output are attached to the container.
--rm-
The container will be removed when it is stopped.
--name mysql-
The name of the container.
-p 3306:3306-
Maps port
3306(the default MySQL port) in the container to the same port on the Docker host so that applications outside of the container can connect to the database server. -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw-
Creates a user and password that has the minimum privileges required by the Debezium MySQL connector.
|
If you use Podman, run the following command:
|
-
Verify that the MySQL server starts.
The MySQL server starts and stops a few times as the configuration is modified. You should see output similar to the following:
... [System] [MY-010931] [Server] /usr/sbin/mysqld: ready for connections. Version: '8.0.27' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server - GPL. [System] [MY-011323] [Server] X Plugin ready for connections. Bind-address: '::' port: 33060, socket: /var/run/mysqld/mysqlx.sock
Starting a MySQL command line client
After starting MySQL, you start a MySQL command line client so that you access the sample inventory database.
-
Open a new terminal, and use it to start the MySQL command line client in a container.
This command runs a new container using the mysql:8.0 image, and defines a shell command to run the MySQL command line client with the correct options:
$ docker run -it --rm --name mysqlterm --link mysql --rm mysql:8.0 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'-it-
The container is interactive, which means the terminal’s standard input and output are attached to the container.
--rm-
The container will be removed when it is stopped.
--name mysqlterm-
The name of the container.
--link mysql-
Links the container to the
mysqlcontainer.
|
If you use Podman, run the following command:
|
-
Verify that the MySQL command line client started.
You should see output similar to the following:
mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 9 Server version: 8.0.27 MySQL Community Server - GPL Copyright (c) 2000, 2021, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> -
At the
mysql>command prompt, switch to the inventory database:mysql> use inventory; -
List the tables in the database:
mysql> show tables; +---------------------+ | Tables_in_inventory | +---------------------+ | addresses | | customers | | geom | | orders | | products | | products_on_hand | +---------------------+ 6 rows in set (0.00 sec) -
Use the MySQL command line client to explore the database and view the pre-loaded data in the database.
For example:
mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
Starting Kafka Connect
After starting MySQL and connecting to the inventory database with the MySQL command line client,
you start the Kafka Connect service.
This service exposes a REST API to manage the Debezium MySQL connector.
-
Open a new terminal, and use it to start the Kafka Connect service in a container.
This command runs a new container using the 1.9 version of the
quay.io/debezium/connectimage:$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:1.9-it-
The container is interactive, which means the terminal’s standard input and output are attached to the container.
--rm-
The container will be removed when it is stopped.
--name connect-
The name of the container.
-p 8083:8083-
Maps port
8083in the container to the same port on the Docker host. This enables applications outside of the container to use Kafka Connect’s REST API to set up and manage new container instances. -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses-
Sets environment variables required by the Debezium image.
--link kafka:kafka --link mysql:mysql-
Links the container to the containers that are running Kafka and the MySQL server.
|
If you use Podman, run the following command:
|
|
If you provide a If this is a problem then set environment variable |
-
Verify that Kafka Connect started and is ready to accept connections.
You should see output similar to the following:
... 2020-02-06 15:48:33,939 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser] ... 2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder] 2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder] -
Use the Kafka Connect REST API to check the status of the Kafka Connect service.
Kafka Connect exposes a REST API to manage Debezium connectors. To communicate with the Kafka Connect service, you can use the
curlcommand to send API requests to port 8083 of the Docker host (which you mapped to port 8083 in theconnectcontainer when you started Kafka Connect).These commands use
localhost. If you are using a non-native Docker platform (such as Docker Toolbox), replacelocalhostwith the IP address of your Docker host.-
Open a new terminal and check the status of the Kafka Connect service:
$ curl -H "Accept:application/json" localhost:8083/ {"version":"3.2.0","commit":"cb8625948210849f"} (1)1 The response shows that Kafka Connect version 3.2.0 is running. -
Check the list of connectors registered with Kafka Connect:
$ curl -H "Accept:application/json" localhost:8083/connectors/ [] (1)1 No connectors are currently registered with Kafka Connect.
-
Deploying the MySQL connector
After starting the Debezium and MySQL services,
you are ready to deploy the Debezium MySQL connector so that it can start monitoring the sample MySQL database (inventory).
At this point, you are running the Debezium services,
a MySQL database server with a sample inventory database,
and the MySQL command line client that is connected to the database.
To deploy the MySQL connector,
you must:
-
Register the MySQL connector to monitor the
inventorydatabaseAfter the connector is registered, it will start monitoring the database server’s
binlogand it will generate change events for each row that changes. -
Watch the MySQL connector start
Reviewing the Kafka Connect log output as the connector starts helps you to better understand each task it must complete before it can start monitoring the
binlog.
Registering a connector to monitor the inventory database
By registering the Debezium MySQL connector,
the connector will start monitoring the MySQL database server’s binlog.
The binlog records all of the database’s transactions (such as changes to individual rows and changes to the schemas).
When a row in the database changes,
Debezium generates a change event.
|
In a production environment, you would typically either use the Kafka tools to manually create the necessary topics, including specifying the number of replicas, or you’d use the Kafka Connect mechanism for customizing the settings of auto-created topics. However, for this tutorial, Kafka is configured to automatically create the topics with just one replica. |
-
Review the configuration of the Debezium MySQL connector that you will register.
Before registering the connector, you should be familiar with its configuration. In the next step, you will register the following connector:
{ "name": "inventory-connector", (1) "config": { (2) "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", (3) "database.hostname": "mysql", (4) "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", (5) "database.server.name": "dbserver1", (5) "database.include.list": "inventory", (6) "database.history.kafka.bootstrap.servers": "kafka:9092", (7) "database.history.kafka.topic": "schema-changes.inventory" (7) } }1 The name of the connector. 2 The connector’s configuration. 3 Only one task should operate at any one time. Because the MySQL connector reads the MySQL server’s binlog, using a single connector task ensures proper order and event handling. The Kafka Connect service uses connectors to start one or more tasks that do the work, and it automatically distributes the running tasks across the cluster of Kafka Connect services. If any of the services stop or crash, those tasks will be redistributed to running services.4 The database host, which is the name of the Docker container running the MySQL server ( mysql). Docker manipulates the network stack within the containers so that each linked container can be resolved with/etc/hostsusing the container name for the host name. If MySQL were running on a normal network, you would specify the IP address or resolvable host name for this value.5 A unique server ID and name. The server name is the logical identifier for the MySQL server or cluster of servers. This name will be used as the prefix for all Kafka topics. 6 Only changes in the inventorydatabase will be detected.7 The connector will store the history of the database schemas in Kafka using this broker (the same broker to which you are sending events) and topic name. Upon restart, the connector will recover the schemas of the database that existed at the point in time in the binlogwhen the connector should begin reading.For more information, see MySQL connector configuration properties.
|
For security reasons, you shouldn’t put passwords or other secrets in plain text into connector configurations. Instead, any secrets should be externalized via the mechanism defined in KIP-297("Externalizing Secrets for Connect Configurations"). |
-
Open a new terminal, and use the
curlcommand to register the Debezium MySQL connector.This command uses the Kafka Connect service’s API to submit a
POSTrequest against the/connectorsresource with a JSON document that describes the new connector (calledinventory-connector).This command uses
localhostto connect to the Docker host. If you are using a non-native Docker platform, replacelocalhostwith the IP address of of your Docker host.$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'Windows users may need to escape the double-quotes. For example:
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ \"name\": \"inventory-connector\", \"config\": { \"connector.class\": \"io.debezium.connector.mysql.MySqlConnector\", \"tasks.max\": \"1\", \"database.hostname\": \"mysql\", \"database.port\": \"3306\", \"database.user\": \"debezium\", \"database.password\": \"dbz\", \"database.server.id\": \"184054\", \"database.server.name\": \"dbserver1\", \"database.include.list\": \"inventory\", \"database.history.kafka.bootstrap.servers\": \"kafka:9092\", \"database.history.kafka.topic\": \"dbhistory.inventory\" } }'Otherwise, you might see an error like the following:
{"error_code":500,"message":"Unexpected character ('n' (code 110)): was expecting double-quote to start field name\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 4]"}
|
If you use Podman, run the following command:
|
-
Verify that
inventory-connectoris included in the list of connectors:$ curl -H "Accept:application/json" localhost:8083/connectors/ ["inventory-connector"] -
Review the connector’s tasks:
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connectorYou should see a response similar to the following (formatted for readability):
HTTP/1.1 200 OK Date: Thu, 06 Feb 2020 22:12:03 GMT Content-Type: application/json Content-Length: 531 Server: Jetty(9.4.20.v20190813) { "name": "inventory-connector", ... "tasks": [ { "connector": "inventory-connector", (1) "task": 0 } ] }1 The connector is running a single task (task 0) to do its work. The connector only supports a single task, because MySQL records all of its activities in one sequentialbinlog. This means the connector only needs one reader to get a consistent, ordered view of all of the events.
Watching the connector start
When you register a connector,
it generates a large amount of log output in the Kafka Connect container.
By reviewing this output,
you can better understand the process that the connector goes through from the time it is created until it begins reading the MySQL server’s binlog.
After registering the inventory-connector connector,
you can review the log output in the Kafka Connect container (connect) to track the connector’s status.
The first few lines show the connector (inventory-connector) being created and started:
...
2021-11-30 01:38:44,223 INFO || [Worker clientId=connect-1, groupId=1] Tasks [inventory-connector-0] configs updated [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] Handling task config update by restarting tasks [] [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] Rebalance started [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] (Re-)joining group [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,227 INFO || [Worker clientId=connect-1, groupId=1] Successfully joined group with generation Generation{generationId=3, memberId='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', protocol='sessioned'} [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,230 INFO || [Worker clientId=connect-1, groupId=1] Successfully synced group in generation Generation{generationId=3, memberId='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', protocol='sessioned'} [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,231 INFO || [Worker clientId=connect-1, groupId=1] Joined group at generation 3 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', leaderUrl='http://172.17.0.7:8083/', offset=4, connectorIds=[inventory-connector], taskIds=[inventory-connector-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,232 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset 4 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,232 INFO || [Worker clientId=connect-1, groupId=1] Starting task inventory-connector-0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
...
Further down, you should see output like the following from the connector:
...
2021-11-30 01:38:44,406 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,406 INFO || Kafka commitId: 8cb0a5e9d3441962 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,407 INFO || Kafka startTimeMs: 1638236324406 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,437 INFO || Database history topic '(name=dbhistory.inventory, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs={cleanup.policy=delete, retention.ms=9223372036854775807, retention.bytes=-1})' created [io.debezium.relational.history.KafkaDatabaseHistory]
2021-11-30 01:38:44,497 INFO || App info kafka.admin.client for dbserver1-dbhistory unregistered [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,499 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics]
2021-11-30 01:38:44,499 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics]
2021-11-30 01:38:44,499 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics]
2021-11-30 01:38:44,499 INFO || Reconnecting after finishing schema recovery [io.debezium.connector.mysql.MySqlConnectorTask]
2021-11-30 01:38:44,524 INFO || Requested thread factory for connector MySqlConnector, id = dbserver1 named = change-event-source-coordinator [io.debezium.util.Threads]
2021-11-30 01:38:44,525 INFO || Creating thread debezium-mysqlconnector-dbserver1-change-event-source-coordinator [io.debezium.util.Threads]
2021-11-30 01:38:44,526 INFO || WorkerSourceTask{id=inventory-connector-0} Source task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
2021-11-30 01:38:44,529 INFO MySQL|dbserver1|snapshot Metrics registered [io.debezium.pipeline.ChangeEventSourceCoordinator]
2021-11-30 01:38:44,529 INFO MySQL|dbserver1|snapshot Context created [io.debezium.pipeline.ChangeEventSourceCoordinator]
2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot No previous offset has been found [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot According to the connector configuration both schema and data will be snapshotted [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot Snapshot step 1 - Preparing [io.debezium.relational.RelationalSnapshotChangeEventSource]
...
The Debezium log output uses mapped diagnostic contexts (MDC) to provide thread-specific information in the log output,
and make it easier to understand what is happening in the multi-threaded Kafka Connect service.
This includes the connector type (MySQL in the above log messages),
the logical name of the connector (dbserver1 above),
and the connector’s activity (task, snapshot and binlog).
In the log output above,
the first few lines involve the task activity of the connector,
and report some bookkeeping information (in this case, that the connector was started with no prior offset).
The next three lines involve the snapshot activity of the connector,
and report that a snapshot is being started using the debezium MySQL user along with the MySQL grants associated with that user.
|
If the connector is not able to connect,
or if it does not see any tables or the |
Next, the connector reports the steps that make up the snapshot operation:
...
2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot Snapshot step 1 - Preparing [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,535 INFO MySQL|dbserver1|snapshot Snapshot step 2 - Determining captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,535 INFO MySQL|dbserver1|snapshot Read list of available databases [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,537 INFO MySQL|dbserver1|snapshot list of available databases is: [information_schema, inventory, mysql, performance_schema, sys] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,537 INFO MySQL|dbserver1|snapshot Read list of available tables in each database [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,548 INFO MySQL|dbserver1|snapshot snapshot continuing with database(s): [inventory] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,551 INFO MySQL|dbserver1|snapshot Snapshot step 3 - Locking captured tables [inventory.addresses, inventory.customers, inventory.geom, inventory.orders, inventory.products, inventory.products_on_hand] [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,552 INFO MySQL|dbserver1|snapshot Flush and obtain global read lock to prevent writes to database [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,557 INFO MySQL|dbserver1|snapshot Snapshot step 4 - Determining snapshot offset [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,560 INFO MySQL|dbserver1|snapshot Read binlog position of MySQL primary server [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot using binlog 'mysql-bin.000003' at position '156' and gtid '' [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot Snapshot step 5 - Reading structure of captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot All eligible tables schema should be captured, capturing: [inventory.addresses, inventory.customers, inventory.geom, inventory.orders, inventory.products, inventory.products_on_hand] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,058 INFO MySQL|dbserver1|snapshot Reading structure of database 'inventory' [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,187 INFO MySQL|dbserver1|snapshot Snapshot step 6 - Persisting schema history [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,273 INFO MySQL|dbserver1|snapshot Releasing global read lock to enable MySQL writes [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,274 INFO MySQL|dbserver1|snapshot Writes to MySQL tables prevented for a total of 00:00:00.717 [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,274 INFO MySQL|dbserver1|snapshot Snapshot step 7 - Snapshotting data [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,275 INFO MySQL|dbserver1|snapshot Snapshotting contents of 6 tables while still in transaction [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,275 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.addresses' (1 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,276 INFO MySQL|dbserver1|snapshot For table 'inventory.addresses' using select statement: 'SELECT `id`, `customer_id`, `street`, `city`, `state`, `zip`, `type` FROM `inventory`.`addresses`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,295 INFO MySQL|dbserver1|snapshot Finished exporting 7 records for table 'inventory.addresses'; total duration '00:00:00.02' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,296 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.customers' (2 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,296 INFO MySQL|dbserver1|snapshot For table 'inventory.customers' using select statement: 'SELECT `id`, `first_name`, `last_name`, `email` FROM `inventory`.`customers`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,304 INFO MySQL|dbserver1|snapshot Finished exporting 4 records for table 'inventory.customers'; total duration '00:00:00.008' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,304 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.geom' (3 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,305 INFO MySQL|dbserver1|snapshot For table 'inventory.geom' using select statement: 'SELECT `id`, `g`, `h` FROM `inventory`.`geom`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot Finished exporting 3 records for table 'inventory.geom'; total duration '00:00:00.011' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.orders' (4 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot For table 'inventory.orders' using select statement: 'SELECT `order_number`, `order_date`, `purchaser`, `quantity`, `product_id` FROM `inventory`.`orders`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot Finished exporting 4 records for table 'inventory.orders'; total duration '00:00:00.008' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.products' (5 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot For table 'inventory.products' using select statement: 'SELECT `id`, `name`, `description`, `weight` FROM `inventory`.`products`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,343 INFO MySQL|dbserver1|snapshot Finished exporting 9 records for table 'inventory.products'; total duration '00:00:00.017' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,344 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.products_on_hand' (6 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,344 INFO MySQL|dbserver1|snapshot For table 'inventory.products_on_hand' using select statement: 'SELECT `product_id`, `quantity` FROM `inventory`.`products_on_hand`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,353 INFO MySQL|dbserver1|snapshot Finished exporting 9 records for table 'inventory.products_on_hand'; total duration '00:00:00.009' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,355 INFO MySQL|dbserver1|snapshot Snapshot - Final stage [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
2021-11-30 01:38:45,356 INFO MySQL|dbserver1|snapshot Snapshot ended with SnapshotResult [status=COMPLETED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=2021-11-30T01:38:45.352Z, threadId=-1, currentQuery=null, tableIds=[inventory.products_on_hand], databaseName=inventory], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] [io.debezium.pipeline.ChangeEventSourceCoordinator]
...
Each of these steps reports what the connector is doing to perform the consistent snapshot.
For example, Step 6 involves reverse engineering the DDL create statements for the tables that are being captured
and the global write lock just 1 second after acquiring it,
and Step 7 reads all of the rows in each of the tables and reports the time taken and number of rows found.
In this case, the connector completed its consistent snapshot in just under 1 second.
|
The snapshot process will take longer with your databases, but the connector outputs enough log messages that you can track what it is working on, even when the tables have a large number of rows. And although an exclusive write lock is used at the beginning of the snapshot process, it should not last very long even for large databases. This is because the lock is released before any data is copied. For more information, see the MySQL connector documentation. |
Next, Kafka Connect reports some "errors". However, you can safely ignore these warnings: these messages just mean that new Kafka topics were created and that Kafka had to assign a new leader for each one:
...
2021-11-30 01:38:45,555 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 3 : {dbserver1=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:45,691 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 9 : {dbserver1.inventory.addresses=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:45,813 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 13 : {dbserver1.inventory.customers=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:45,927 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 18 : {dbserver1.inventory.geom=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:46,043 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 22 : {dbserver1.inventory.orders=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:46,153 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 26 : {dbserver1.inventory.products=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:46,269 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 31 : {dbserver1.inventory.products_on_hand=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
...
Finally, the log output shows that the connector has transitioned from its snapshot mode into continuously reading the MySQL server’s binlog:
...
2021-11-30 01:38:45,362 INFO MySQL|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
...
Nov 30, 2021 1:38:45 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql:3306 at mysql-bin.000003/156 (sid:184054, cid:13)
2021-11-30 01:38:45,392 INFO MySQL|dbserver1|binlog Connected to MySQL binlog at mysql:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=2021-11-30T01:38:45.352Z, threadId=-1, currentQuery=null, tableIds=[inventory.products_on_hand], databaseName=inventory], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]] [io.debezium.connector.mysql.MySqlStreamingChangeEventSource]
2021-11-30 01:38:45,392 INFO MySQL|dbserver1|streaming Waiting for keepalive thread to start [io.debezium.connector.mysql.MySqlStreamingChangeEventSource]
2021-11-30 01:38:45,393 INFO MySQL|dbserver1|binlog Creating thread debezium-mysqlconnector-dbserver1-binlog-client [io.debezium.util.Threads]
...
Viewing change events
After deploying the Debezium MySQL connector,
it starts monitoring the inventory database for data change events.
When you watched the connector start up,
you saw that events were written to the following topics with the dbserver1 prefix (the name of the connector):
dbserver1-
The schema change topic to which all of the DDL statements are written.
dbserver1.inventory.products-
Captures change events for the
productstable in theinventorydatabase. dbserver1.inventory.products_on_hand-
Captures change events for the
products_on_handtable in theinventorydatabase. dbserver1.inventory.customers-
Captures change events for the
customerstable in theinventorydatabase. dbserver1.inventory.orders-
Captures change events for the
orderstable in theinventorydatabase.
For this tutorial, you will explore the dbserver1.inventory.customers topic.
In this topic, you will see different types of change events to see how the connector captured them:
Viewing a create event
By viewing the dbserver1.inventory.customers topic,
you can see how the MySQL connector captured create events in the inventory database.
In this case, the create events capture new customers being added to the database.
-
Open a new terminal, and use it to start the
watch-topicutility to watch thedbserver1.inventory.customerstopic from the beginning of the topic.The
watch-topicutility is very simple and limited in functionality. It is not intended to be used by an application to consume events. In that scenario, you would instead use Kafka consumers and the applicable consumer libraries that offer full functionality and flexibility.This command runs the
watch-topicutility in a new container using the 1.9 version of thedebezium/kafkaimage:$ docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:1.9 watch-topic -a -k dbserver1.inventory.customers-a-
Watches all events since the topic was created. Without this option,
watch-topicwould only show the events recorded after you start watching. -k-
Specifies that the output should include the event’s key. In this case, this contains the row’s primary key.
If you use Podman, run the following command:
$ sudo podman run -it --rm --name watcher --pod dbz quay.io/debezium/kafka:1.9 watch-topic -a -k dbserver1.inventory.customersThe
watch-topicutility returns the event records from thecustomerstable. There are four events, one for each row in the table. Each event is formatted in JSON, because that is how you configured the Kafka Connect service. There are two JSON documents for each event: one for the key, and one for the value.You should see output similar to the following:
Using ZOOKEEPER_CONNECT=172.17.0.2:2181 Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092 Using KAFKA_BROKER=172.17.0.3:9092 Contents of topic dbserver1.inventory.customers: {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}} ...This utility keeps watching the topic, so any new events will automatically appear as long as the utility is running.
-
For the last event, review the details of the key.
Here are the details of the key of the last event (formatted for readability):
{ "schema":{ "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"id" } ], "optional":false, "name":"dbserver1.inventory.customers.Key" }, "payload":{ "id":1004 } }The event has two parts: a
schemaand apayload. Theschemacontains a Kafka Connect schema describing what is in the payload. In this case, the payload is astructnameddbserver1.inventory.customers.Keythat is not optional and has one required field (idof typeint32).The
payloadhas a singleidfield, with a value of1004.By reviewing the key of the event, you can see that this event applies to the row in the
inventory.customerstable whoseidprimary key column had a value of1004. -
Review the details of the same event’s value.
The event’s value shows that the row was created, and describes what it contains (in this case, the
id,first_name,last_name, andemailof the inserted row).Here are the details of the value of the last event (formatted for readability):
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "int64", "optional": false, "field": "ts_sec" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "boolean", "optional": true, "field": "snapshot" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "db" }, { "type": "string", "optional": true, "field": "table" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "dbserver1.inventory.customers.Envelope", "version": 1 }, "payload": { "before": null, "after": { "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { "version": "1.9.8.Final", "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": "inventory", "table": "customers" }, "op": "r", "ts_ms": 1486500577691 } }This portion of the event is much longer, but like the event’s key, it also has a
schemaand apayload. Theschemacontains a Kafka Connect schema nameddbserver1.inventory.customers.Envelope(version 1) that can contain five fields:op-
A required field that contains a string value describing the type of operation. Values for the MySQL connector are
cfor create (or insert),ufor update,dfor delete, andrfor read (in the case of a snapshot). before-
An optional field that, if present, contains the state of the row before the event occurred. The structure will be described by the
dbserver1.inventory.customers.ValueKafka Connect schema, which thedbserver1connector uses for all rows in theinventory.customerstable. after-
An optional field that, if present, contains the state of the row after the event occurred. The structure is described by the same
dbserver1.inventory.customers.ValueKafka Connect schema used inbefore. source-
A required field that contains a structure describing the source metadata for the event, which in the case of MySQL, contains several fields: the connector name, the name of the
binlogfile where the event was recorded, the position in thatbinlogfile where the event appeared, the row within the event (if there is more than one), the names of the affected database and table, the MySQL thread ID that made the change, whether this event was part of a snapshot, and, if available, the MySQL server ID, and the timestamp in seconds. ts_ms-
An optional field that, if present, contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.
The JSON representations of the events are much longer than the rows they describe. This is because, with every event key and value, Kafka Connect ships the schema that describes the payload. Over time, this structure may change. However, having the schemas for the key and the value in the event itself makes it much easier for consuming applications to understand the messages, especially as they evolve over time.
The Debezium MySQL connector constructs these schemas based upon the structure of the database tables. If you use DDL statements to alter the table definitions in the MySQL databases, the connector reads these DDL statements and updates its Kafka Connect schemas. This is the only way that each event is structured exactly like the table from where it originated at the time the event occurred. However, the Kafka topic containing all of the events for a single table might have events that correspond to each state of the table definition.
The JSON converter includes the key and value schemas in every message, so it does produce very verbose events. Alternatively, you can use Apache Avro as a serialization format, which results in far smaller event messages. This is because it transforms each Kafka Connect schema into an Avro schema and stores the Avro schemas in a separate Schema Registry service. Thus, when the Avro converter serializes an event message, it places only a unique identifier for the schema along with an Avro-encoded binary representation of the value. As a result, the serialized messages that are transferred over the wire and stored in Kafka are far smaller than what you have seen here. In fact, the Avro Converter is able to use Avro schema evolution techniques to maintain the history of each schema in the Schema Registry.
-
Compare the event’s key and value schemas to the state of the
inventorydatabase. In the terminal that is running the MySQL command line client, run the following statement:mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)This shows that the event records you reviewed match the records in the database.
Updating the database and viewing the update event
Now that you have seen how the Debezium MySQL connector captured the create events in the inventory database,
you will now change one of the records and see how the connector captures it.
By completing this procedure, you will learn how to find details about what changed in a database commit, and how you can compare change events to determine when the change occurred in relation to other changes.
-
In the terminal that is running the MySQL command line client, run the following statement:
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0 -
View the updated
customerstable:mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne Marie | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec) -
Switch to the terminal running
watch-topicto see a new fifth event.By changing a record in the
customerstable, the Debezium MySQL connector generated a new event. You should see two new JSON documents: one for the event’s key, and one for the new event’s value.Here are the details of the key for the update event (formatted for readability):
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key" "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }This key is the same as the key for the previous events.
Here is that new event’s value. There are no changes in the
schemasection, so only thepayloadsection is shown (formatted for readability):{ "schema": {...}, "payload": { "before": { (1) "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": { (2) "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { (3) "name": "1.9.8.Final", "name": "dbserver1", "server_id": 223344, "ts_sec": 1486501486, "gtid": null, "file": "mysql-bin.000003", "pos": 364, "row": 0, "snapshot": null, "thread": 3, "db": "inventory", "table": "customers" }, "op": "u", (4) "ts_ms": 1486501486308 (5) } }1 The beforefield now has the state of the row with the values before the database commit.2 The afterfield now has the updated state of the row, and thefirst_namevalue is nowAnne Marie.3 The sourcefield structure has many of the same values as before, except that thets_secandposfields have changed (thefilemight have changed in other circumstances).4 The opfield value is nowu, signifying that this row changed because of an update.5 The ts_msfield shows the time stamp for when Debezium processed this event.By viewing the
payloadsection, you can learn several important things about the update event:-
By comparing the
beforeandafterstructures, you can determine what actually changed in the affected row because of the commit. -
By reviewing the
sourcestructure, you can find information about MySQL’s record of the change (providing traceability). -
By comparing the
payloadsection of an event to other events in the same topic (or a different topic), you can determine whether the event occurred before, after, or as part of the same MySQL commit as another event.
-
Deleting a record in the database and viewing the delete event
Now that you have seen how the Debezium MySQL connector captured the create and update events in the inventory database,
you will now delete one of the records and see how the connector captures it.
By completing this procedure, you will learn how to find details about delete events, and how Kafka uses log compaction to reduce the number of delete events while still enabling consumers to get all of the events.
-
In the terminal that is running the MySQL command line client, run the following statement:
mysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)If the above command fails with a foreign key constraint violation, then you must remove the reference of the customer address from the addresses table using the following statement:
mysql> DELETE FROM addresses WHERE customer_id=1004; -
Switch to the terminal running
watch-topicto see two new events.By deleting a row in the
customerstable, the Debezium MySQL connector generated two new events. -
Review the key and value for the first new event.
Here are the details of the key for the first new event (formatted for readability):
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key" "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }This key is the same as the key in the previous two events you looked at.
Here is the value of the first new event (formatted for readability):
{ "schema": {...}, "payload": { "before": { (1) "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": null, (2) "source": { (3) "name": "1.9.8.Final", "name": "dbserver1", "server_id": 223344, "ts_sec": 1486501558, "gtid": null, "file": "mysql-bin.000003", "pos": 725, "row": 0, "snapshot": null, "thread": 3, "db": "inventory", "table": "customers" }, "op": "d", (4) "ts_ms": 1486501558315 (5) } }1 The beforefield now has the state of the row that was deleted with the database commit.2 The afterfield isnullbecause the row no longer exists.3 The sourcefield structure has many of the same values as before, except thets_secandposfields have changed (thefilemight have changed in other circumstances).4 The opfield value is nowd, signifying that this row was deleted.5 The ts_msfield shows the time stamp for when Debezium processes this event.Thus, this event provides a consumer with the information that it needs to process the removal of the row. The old values are also provided, because some consumers might require them to properly handle the removal.
-
Review the key and value for the second new event.
Here is the key for the second new event (formatted for readability):
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key" "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }Once again, this key is exactly the same key as in the previous three events you looked at.
Here is the value of that same event (formatted for readability):
{ "schema": null, "payload": null }If Kafka is set up to be log compacted, it will remove older messages from the topic if there is at least one message later in the topic with same key. This last event is called a tombstone event, because it has a key and an empty value. This means that Kafka will remove all prior messages with the same key. Even though the prior messages will be removed, the tombstone event means that consumers can still read the topic from the beginning and not miss any events.
Restarting the Kafka Connect service
Now that you have seen how the Debezium MySQL connector captures create, update, and delete events, you will now see how it can capture change events even when it is not running.
The Kafka Connect service automatically manages tasks for its registered connectors. Therefore, if it goes offline, when it restarts, it will start any non-running tasks. This means that even if Debezium is not running, it can still report changes in a database.
In this procedure, you will stop Kafka Connect, change some data in the database, and then restart Kafka Connect to see the change events.
-
Open a new terminal and use it to stop the
connectcontainer that is running the Kafka Connect service:$ docker stop connectThe
connectcontainer is stopped, and the Kafka Connect service gracefully shuts down.Because you ran the container with the
--rmoption, Docker removes the container once it stops. -
While the service is down, switch to the terminal for the MySQL command line client, and add a few records:
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com"); mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");The records are added to the database. However, because Kafka Connect is not running,
watch-topicdoes not record any updates.In a production system, you would have enough brokers to handle the producers and consumers, and to maintain a minimum number of in-sync replicas for each topic. Therefore, if enough brokers failed such that there were no longer the minimum number of ISRs, Kafka would become unavailable. In this scenario, producers (like the Debezium connectors) and consumers will wait for the Kafka cluster or network to recover. This means that, temporarily, your consumers might not see any change events as data is changed in the databases. This is because no change events are being produced. As soon as the Kafka cluster is restarted or the network recovers, Debezium will resume producing change events, and your consumers will resume consuming events where they left off.
-
Open a new terminal, and use it to restart the Kafka Connect service in a container.
This command starts Kafka Connect using the same options you used when you initially started it:
$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:1.9The Kafka Connect service starts, connects to Kafka, reads the previous service’s configuration, and starts the registered connectors that will resume where they last left off.
Here are the last few lines from this restarted service:
... 2021-11-30 01:49:07,938 INFO || Get all known binlogs from MySQL [io.debezium.connector.mysql.MySqlConnection] 2021-11-30 01:49:07,941 INFO || MySQL has the binlog file 'mysql-bin.000003' required by the connector [io.debezium.connector.mysql.MySqlConnectorTask] 2021-11-30 01:49:07,967 INFO || Requested thread factory for connector MySqlConnector, id = dbserver1 named = change-event-source-coordinator [io.debezium.util.Threads] 2021-11-30 01:49:07,968 INFO || Creating thread debezium-mysqlconnector-dbserver1-change-event-source-coordinator [io.debezium.util.Threads] 2021-11-30 01:49:07,968 INFO || WorkerSourceTask{id=inventory-connector-0} Source task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask] 2021-11-30 01:49:07,971 INFO MySQL|dbserver1|snapshot Metrics registered [io.debezium.pipeline.ChangeEventSourceCoordinator] 2021-11-30 01:49:07,971 INFO MySQL|dbserver1|snapshot Context created [io.debezium.pipeline.ChangeEventSourceCoordinator] 2021-11-30 01:49:07,976 INFO MySQL|dbserver1|snapshot A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted. [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource] 2021-11-30 01:49:07,977 INFO MySQL|dbserver1|snapshot Snapshot ended with SnapshotResult [status=SKIPPED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] [io.debezium.pipeline.ChangeEventSourceCoordinator] 2021-11-30 01:49:07,981 INFO MySQL|dbserver1|streaming Requested thread factory for connector MySqlConnector, id = dbserver1 named = binlog-client [io.debezium.util.Threads] 2021-11-30 01:49:07,983 INFO MySQL|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator] ...These lines show that the service found the offsets previously recorded by the last task before it was shut down, connected to the MySQL database, started reading the
binlogfrom that position, and generated events from any changes in the MySQL database since that point in time. -
Switch to the terminal running
watch-topicto see events for the two new records you created when Kafka Connect was offline:{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Sarah","last_name":"Thompson","email":"kitt@acme.com"},"source":{"version":"1.9.8.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635153,"gtid":null,"file":"mysql-bin.000003","pos":1046,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181455}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1006}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1006,"first_name":"Kenneth","last_name":"Anderson","email":"kander@acme.com"},"source":{"version":"1.9.8.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635160,"gtid":null,"file":"mysql-bin.000003","pos":1356,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181456}}These events are create events that are similar to what you saw previously. As you see, Debezium still reports all of the changes in a database even when it is not running (as long as it is restarted before the MySQL database purges from its
binlogthe commits that were missed).
Cleaning up
After you are finished with the tutorial, you can use Docker to stop all of the running containers.
-
Stop each of the containers:
$ docker stop mysqlterm watcher connect mysql kafka zookeeperDocker stops each container. Because you used the
--rmoption when you started them, Docker also removes them.
|
If you use Podman, run the following command:
|
-
Verify that all of the processes have stopped and have been removed:
$ docker ps -aIf any of the processes are still running, stop them using
docker stop <process-name>ordocker stop <containerId>.
Next steps
After completing the tutorial, consider the following next steps:
-
Explore the tutorial further.
Use the MySQL command line client to add, modify, and remove rows in the database tables, and see the effect on the topics. You may need to run a separate
watch-topiccommand for each topic. Keep in mind that you cannot remove a row that is referenced by a foreign key. -
Try running the tutorial with Debezium connectors for Postgres, MongoDB, SQL Server, and Oracle.
You can use the Docker Compose version of this tutorial located in the Debezium examples repository. Docker Compose files are provided for running the tutorial with MySQL, Postgres, MongoDB, SQL Server, and Oracle.