Tutorial for Debezium 0.2

This tutorial walks you through running Debezium 0.6.2 for change data capture (CDC). You will use Docker (1.9 or later) to start the Debezium services, run a MySQL database server with a simple example database, use Debezium to monitor the database, and see the resulting event streams respond as the data in the database changes.

Debezium 0.2 is no longer being maintained. The ink:/docs/tutorial[newest tutorial] uses the latest Debezium version.

What is Debezium?

Debezium is a distributed platform that turns your existing databases into event streams, so applications can see and respond immediately to each row-level change in the databases. Debezium is built on top of Apache Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, from where your application consumes them. This makes it possible for your application to easily consume all of the events correctly and completely. Even if your application stops (or crashes), upon restart it will start consume the events where it left off so it misses nothing.

Debezium 0.6.2 includes support for monitoring MySQL database servers with its MySQL connector, and this is what we’ll use in this demonstration. Support for other DBMSes will be added in future releases.

Running Debezium with Docker

Running Debezium involves three major services: Zookeeper, Kafka, and Debezium’s connector service. This tutorial walks you through starting a single instance of these services using Docker and Debezium’s Docker images. Production environments, on the other hand, require running multiple instances of each service to provide the performance, reliability, replication, and fault tolerance. This can be done with a platform like OpenShift and Kubernetes that manages multiple Docker containers running on multiple hosts and machines, but often you’ll want to install on dedicated hardware.

Starting Docker

Make sure that Docker is installed and running. If you’re running Linux, you can either run the Docker daemon or configure it to run automatically on startup. In this case, the Docker host is your local machine.

If you’re using Windows or OS X, you have to run the Docker daeomon in a virtual machine. See our Docker documentation for detailed instructions on installing, starting, and using Docker Machine. Every time you start a new terminal, you need to configure the Docker Machine environment so any Docker commands you run on your host computer can communicate with the Docker daemon running in the virtual machine:

$ eval $(docker-machine env)

Starting simple with Debezium

For simple evaluation and experimentation, this tutorial will walk you through starting a single instance of each service in a separate container on your local machine. Zookeeper and Kafka both store data locally inside the container, and normal usage requires mounting directories on the host machines as volumes so that when the containers stop the persisted data will remain. We’re skipping that in this tutorial, although the documentation for our Docker images describes how to do that. This means that when a container is removed, all persisted data will be lost. That’s actually ideal for our experiment, since nothing will be left on your computer when we’re finished, and you can run this experiment many times without having to clean anything up in between.

Running multiple services locally can be confusing, so we’re going to use a separate terminal to run each container in the foreground. This way all of the output of a container will be displayed in the terminal used to run it.

This is not the only way to run Docker containers. Rather than running a container in the foreground (with --it), Docker lets you run a container in detached mode (with -d), where the container is started and the Docker command returns immediately. Detached mode containers don’t display their output in the terminal, though you can always see the output by using docker logs --follow --name <container-name>. This is one reason we name each of the containers we run. See the Docker documentation for more detail.

Start Zookeeper

Of all the different services/processes that make up Debezium, the first one to start is Zookeeper. Start a new terminal with the Docker environment, and then start a container with Zookeeper by running:

$ docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.2

This runs a new container using version 0.6 of the debezium/zookeeper image, and assigns the name zookeeper to this container. The -it flag makes the container interactive, meaning it attaches the terminal’s standard input and output to the container so that you can see what is going on in the container. The three -p options map three of the container’s ports (e.g., 2181, 2888, and 3888) to the same ports on the Docker host so that other containers (and software outside the container) can talk with Zookeeper.

You should see in your terminal the typical output of Zookeeper:

Starting up in standalone mode
JMX enabled by default
Using config: /zookeeper/conf/zoo.cfg
2016-06-09 15:28:19,369 - INFO  [main:QuorumPeerConfig@103] - Reading configuration from: /zookeeper/conf/zoo.cfg
2016-06-09 15:28:19,377 - INFO  [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2016-06-09 15:28:19,377 - INFO  [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
2016-06-09 15:28:19,379 - WARN  [main:QuorumPeerMain@113] - Either no config or no quorum defined in config, running  in standalone mode
2016-06-09 15:28:19,384 - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
2016-06-09 15:28:19,403 - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.
2016-06-09 15:28:19,415 - INFO  [main:QuorumPeerConfig@103] - Reading configuration from: /zookeeper/conf/zoo.cfg
2016-06-09 15:28:19,415 - INFO  [main:ZooKeeperServerMain@95] - Starting server
2016-06-09 15:28:19,425 - INFO  [main:Environment@100] - Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2016-06-09 15:28:19,425 - INFO  [main:Environment@100] - Server environment:host.name=5604dd28548d
2016-06-09 15:28:19,425 - INFO  [main:Environment@100] - Server environment:java.version=1.8.0_92
2016-06-09 15:28:19,425 - INFO  [main:Environment@100] - Server environment:java.vendor=Azul Systems, Inc.
2016-06-09 15:28:19,426 - INFO  [main:Environment@100] - Server environment:java.home=/usr/lib/jvm/zulu-8-amd64/jre
2016-06-09 15:28:19,426 - INFO  [main:Environment@100] - Server environment:java.class.path=/zookeeper/bin/../build/classes:/zookeeper/bin/../build/lib/*.jar:/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/zookeeper/bin/../lib/netty-3.7.0.Final.jar:/zookeeper/bin/../lib/log4j-1.2.16.jar:/zookeeper/bin/../lib/jline-0.9.94.jar:/zookeeper/bin/../zookeeper-3.4.6.jar:/zookeeper/bin/../src/java/lib/*.jar:/zookeeper/conf:
2016-06-09 15:28:19,426 - INFO  [main:Environment@100] - Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2016-06-09 15:28:19,430 - INFO  [main:Environment@100] - Server environment:java.io.tmpdir=/tmp
2016-06-09 15:28:19,435 - INFO  [main:Environment@100] - Server environment:java.compiler=<NA>
2016-06-09 15:28:19,435 - INFO  [main:Environment@100] - Server environment:os.name=Linux
2016-06-09 15:28:19,435 - INFO  [main:Environment@100] - Server environment:os.arch=amd64
2016-06-09 15:28:19,436 - INFO  [main:Environment@100] - Server environment:os.version=4.1.17-boot2docker
2016-06-09 15:28:19,436 - INFO  [main:Environment@100] - Server environment:user.name=zookeeper
2016-06-09 15:28:19,436 - INFO  [main:Environment@100] - Server environment:user.home=/zookeeper
2016-06-09 15:28:19,436 - INFO  [main:Environment@100] - Server environment:user.dir=/zookeeper
2016-06-09 15:28:19,438 - INFO  [main:ZooKeeperServer@755] - tickTime set to 2000
2016-06-09 15:28:19,438 - INFO  [main:ZooKeeperServer@764] - minSessionTimeout set to -1
2016-06-09 15:28:19,439 - INFO  [main:ZooKeeperServer@773] - maxSessionTimeout set to -1
2016-06-09 15:28:19,448 - INFO  [main:NIOServerCnxnFactory@94] - binding to port 0.0.0.0/0.0.0.0:2181

The last line is important and reports that Zookeeper is ready and listening on port 2181. The terminal will continue to show additional output as Zookeeper generates it.

Start Kafka

Open a new terminal and configure it with the Docker environment. Then, start Kafka in a new container by running:

$ docker run -it --name kafka -p 9092:9092 -e ADVERTISED_HOST_NAME=$(docker-machine ip) --link zookeeper:zookeeper debezium/kafka:0.2

On Linux, simply replace the $(docker-machine ip) portion of the command with the public IP address of the host machine, and do the same in any other commands in which it appears.

This runs a new container using version 0.6 of the debezium/kafka image, and assigns the name kafka to this container. The -it flag makes the container interactive, meaning it attaches the terminal’s standard input and output to the container so that you can see what is going on in the container. The command maps port 9092 in the container to the same port on the Docker host so that software outside of the container can talk with Kafka. The command uses -e option to set the ADVERTISED_HOST_NAME environment variable in the container to the IP address of the Docker host, and the container uses this environment variable when configuring and starting Kafka so that Kafka properly reports to clients the IP address where it can be found. In our case, we use the docker-machine ip command to optain the IP address of the Docker host. Finally, the command uses the --link zookeeper:zookeeper argument to tell the container that it can find Zookeeper in the container named zookeeper running on the same Docker host.

You should see in your terminal the typical output of Kafka, ending with:

...
2016-06-09 15:30:15,065 - INFO  [main-EventThread:ZkClient@711] - zookeeper state changed (SyncConnected)
2016-06-09 15:30:15,160 - INFO  [main:Logging$class@68] - Loading logs.
2016-06-09 15:30:15,169 - INFO  [main:Logging$class@68] - Logs loading complete.
2016-06-09 15:30:15,400 - INFO  [main:Logging$class@68] - Starting log cleanup with a period of 300000 ms.
2016-06-09 15:30:15,402 - INFO  [main:Logging$class@68] - Starting log flusher with a default period of 9223372036854775807 ms.
2016-06-09 15:30:15,404 - WARN  [main:Logging$class@83] - No meta.properties file under dir /kafka/data/1/meta.properties
2016-06-09 15:30:15,460 - INFO  [main:Logging$class@68] - Awaiting socket connections on 0.0.0.0:9092.
2016-06-09 15:30:15,464 - INFO  [main:Logging$class@68] - [Socket Server on Broker 1], Started 1 acceptor threads
2016-06-09 15:30:15,486 - INFO  [ExpirationReaper-1:Logging$class@68] - [ExpirationReaper-1], Starting
2016-06-09 15:30:15,488 - INFO  [ExpirationReaper-1:Logging$class@68] - [ExpirationReaper-1], Starting
2016-06-09 15:30:15,541 - INFO  [main:Logging$class@68] - Creating /controller (is it secure? false)
2016-06-09 15:30:15,548 - INFO  [main:Logging$class@68] - Result of znode creation is: OK
2016-06-09 15:30:15,549 - INFO  [main:Logging$class@68] - 1 successfully elected as leader
2016-06-09 15:30:15,639 - INFO  [main:Logging$class@68] - [GroupCoordinator 1]: Starting up.
2016-06-09 15:30:15,645 - INFO  [group-metadata-manager-0:Logging$class@68] - [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 4 milliseconds.
2016-06-09 15:30:15,648 - INFO  [ExpirationReaper-1:Logging$class@68] - [ExpirationReaper-1], Starting
2016-06-09 15:30:15,648 - INFO  [main:Logging$class@68] - [GroupCoordinator 1]: Startup complete.
2016-06-09 15:30:15,647 - INFO  [ExpirationReaper-1:Logging$class@68] - [ExpirationReaper-1], Starting
2016-06-09 15:30:15,732 - INFO  [ThrottledRequestReaper-Produce:Logging$class@68] - [ThrottledRequestReaper-Produce], Starting
2016-06-09 15:30:15,741 - INFO  [ThrottledRequestReaper-Fetch:Logging$class@68] - [ThrottledRequestReaper-Fetch], Starting
2016-06-09 15:30:15,763 - INFO  [main:Logging$class@68] - Will not load MX4J, mx4j-tools.jar is not in the classpath
2016-06-09 15:30:15,787 - INFO  [ZkClient-EventThread-14-172.17.0.3:2181:Logging$class@68] - New leader is 1
2016-06-09 15:30:15,791 - INFO  [main:Logging$class@68] - Creating /brokers/ids/1 (is it secure? false)
2016-06-09 15:30:15,801 - INFO  [main:Logging$class@68] - Result of znode creation is: OK
2016-06-09 15:30:15,803 - INFO  [main:Logging$class@68] - Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -> EndPoint(192.168.99.100,9092,PLAINTEXT)
2016-06-09 15:30:15,815 - INFO  [main:AppInfoParser$AppInfo@82] - Kafka version : 0.9.0.1
2016-06-09 15:30:15,816 - INFO  [main:AppInfoParser$AppInfo@83] - Kafka commitId : 23c69d62a0cabf06
2016-06-09 15:30:15,818 - INFO  [main:Logging$class@68] - [Kafka Server 1], started

The last line shown above reports that the Kafka broker has successfully started and is ready for client connections. The terminal will continue to show additional output as Kafka generates it.

Debezium uses Kafka Connect, which Kafka introduced in 0.9.0.0. We’re using Kafka 0.9.0.1 in this tutorial since at the time of writing it is the latest patch release of 0.9.0.x. Debezium will also work with more recent versions of Kafka.

Start Kafka Connect

Open a new terminal and configure it with the Docker environment. In that terminal, start the Kafka Connect service in a new container by running:

$ docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(docker-machine ip) --link zookeeper:zookeeper --link kafka:kafka debezium/connect:0.2

This runs a new container using version 0.6 of the debezium/connect image, and assigns the name connect to this container. The -it flag makes the container interactive, meaning it attaches the terminal’s standard input and output to the container so that you can see what is going on in the container. The command maps port 8083 in the container to the same port on the Docker host so that software outside of the container can use Kafka Connect’s REST API to set up and manage new connector instances. The command uses the --link zookeeper:zookeeper and --link kafka:kafka argument to tell the container that it can find Zookeeper and Kafka in the container named zookeeper and kafka, respectively, running on the same Docker host. And finally, it also uses the -e option four times to set the GROUP_ID, CONFIG_STORAGE_TOPIC, OFFSET_STORAGE_TOPIC, and ADVERTISED_HOST_NAME environment variables; the first three are required by this container (you can use different values as desired), while the last variable is optional but is used by the Kafka Connect server process to tell clients and other service instances the address at which it is listening. In our case, we use the docker-machine ip command to optain the IP address of the Docker host.

You should see in your terminal the typical output of Kafka, ending with:

...
2016-06-09 15:35:22,257 - INFO  [DistributedHerder:AppInfoParser$AppInfo@82] - Kafka version : 0.9.0.1
2016-06-09 15:35:22,259 - INFO  [DistributedHerder:AppInfoParser$AppInfo@83] - Kafka commitId : 23c69d62a0cabf06
2016-06-09 15:35:22,491 - INFO  [main:Server@327] - jetty-9.2.12.v20150709
2016-06-09 15:35:22,837 - INFO  [DistributedHerder:KafkaBasedLog@143] - Finished reading KafakBasedLog for topic my-connect-configs
2016-06-09 15:35:22,837 - INFO  [DistributedHerder:KafkaBasedLog@145] - Started KafakBasedLog for topic my-connect-configs
2016-06-09 15:35:22,838 - INFO  [DistributedHerder:KafkaConfigStorage@242] - Started KafkaConfigStorage
2016-06-09 15:35:22,838 - INFO  [DistributedHerder:DistributedHerder@156] - Herder started
2016-06-09 15:35:23,112 - INFO  [DistributedHerder:DistributedHerder$14@868] - Joined group and got assignment: Assignment{error=0, leader='connect-1-f84dd8fb-ec0d-485f-8b3d-657746927ef2', leaderUrl='http://172.17.0.5:8083/', offset=-1, connectorIds=[], taskIds=[]}
2016-06-09 15:35:23,119 - INFO  [DistributedHerder:DistributedHerder@639] - Starting connectors and tasks using config offset -1
2016-06-09 15:35:23,120 - INFO  [DistributedHerder:DistributedHerder@659] - Finished starting connectors and tasks
Jun 09, 2016 3:35:23 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

2016-06-09 15:35:23,706 - INFO  [main:ContextHandler@744] - Started o.e.j.s.ServletContextHandler@b78a709{/,null,AVAILABLE}
2016-06-09 15:35:23,722 - INFO  [main:AbstractConnector@266] - Started ServerConnector@2e58f579{HTTP/1.1}{172.17.0.5:8083}
2016-06-09 15:35:23,722 - INFO  [main:Server@379] - Started @5447ms
2016-06-09 15:35:23,724 - INFO  [main:RestServer@132] - REST server listening at http://172.17.0.5:8083/, advertising URL http://172.17.0.5:8083/
2016-06-09 15:35:23,724 - INFO  [main:Connect@60] - Kafka Connect started

The last line shown above reports that the service has started and is ready for connections. The terminal will continue to show additional output as the Kafka Connect service generates it.

Using the Kafka Connect REST API

The Kafka Connect service exposes a RESTful API to manage the set of connectors, so let’s use that API using the curl command line tool. Because we mapped port 8083 in the connect container (where the Kafka Connect service is running) to port 8083 on the Docker host, we can communicate to the service by sending the request to port 8083 on the Docker host, which then forwards the request to the Kakfa Connect service.

Open a new terminal and configure it with the Docker environment, and in that terminal run the following command to check the status of the Kafka Connect service:

$ curl -H "Accept:application/json" $(docker-machine ip):8083/

The Kafka Connect service should return a JSON response message similar to the following:

{"version":"0.9.0.1","commit":"23c69d62a0cabf06"}

This shows that we’re running Kafka Connect version 0.9.0.1. Next, check the list of connectors:

$ curl -H "Accept:application/json" $(docker-machine ip):8083/connectors/

which should return the following:

[]

This confirms that the Kafka Connect service is running, that we can talk with it, and that it currently has no connectors.

Start a MySQL database

At this point, we’ve started Zookeeper, Kafka, and Kafka Connect, but we’ve not yet configured Kafka Connect to run any connectors. In other words, the basic Debezium services are running but they’re not yet watching any databases. Before we can set up connectors, we first need a relational database to monitor.

Open a new terminal and configure it with the Docker environment. In that terminal, start a new container that runs a MySQL database server preconfigured with an inventory database:

$ docker run -it --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.2

This runs a new container using version 0.6 of the debezium/example-mysql image, which is based on the mysql:5.7 image, defines and populate a sample "inventory" database, and creates a debezium user with password dbz that has the minimum privileges required by Debezium’s MySQL connector. The command assigns the name mysql to the container so that it can be easily referenced later. The -it flag makes the container interactive, meaning it attaches the terminal’s standard input and output to the container so that you can see what is going on in the container. The command maps port 3036 (the default MySQL port) in the container to the same port on the Docker host so that software outside of the container can connect to the database server. And finally, it also uses the -e option three times to set the MYSQL_ROOT_PASSWORD, MYSQL_USER, and MYSQL_PASSWORD environment variables to specific values.

You should see in your terminal something like the following:

...
MySQL init process done. Ready for start up.

2016-06-09T15:38:14.731166Z 0 [Note] mysqld (mysqld 5.7.12-log) starting as process 1 ...
2016-06-09T15:38:14.734891Z 0 [Note] InnoDB: PUNCH HOLE support available
2016-06-09T15:38:14.734957Z 0 [Note] InnoDB: Mutexes and rw_locks use GCC atomic builtins
2016-06-09T15:38:14.734976Z 0 [Note] InnoDB: Uses event mutexes
2016-06-09T15:38:14.734992Z 0 [Note] InnoDB: GCC builtin __atomic_thread_fence() is used for memory barrier
2016-06-09T15:38:14.735008Z 0 [Note] InnoDB: Compressed tables use zlib 1.2.8
2016-06-09T15:38:14.735023Z 0 [Note] InnoDB: Using Linux native AIO
2016-06-09T15:38:14.735248Z 0 [Note] InnoDB: Number of pools: 1
2016-06-09T15:38:14.735374Z 0 [Note] InnoDB: Using CPU crc32 instructions
2016-06-09T15:38:14.740691Z 0 [Note] InnoDB: Initializing buffer pool, total size = 128M, instances = 1, chunk size = 128M
2016-06-09T15:38:14.745890Z 0 [Note] InnoDB: Completed initialization of buffer pool
2016-06-09T15:38:14.747038Z 0 [Note] InnoDB: If the mysqld execution user is authorized, page cleaner thread priority can be changed. See the man page of setpriority().
2016-06-09T15:38:14.758897Z 0 [Note] InnoDB: Highest supported file format is Barracuda.
2016-06-09T15:38:14.768080Z 0 [Note] InnoDB: Creating shared tablespace for temporary tables
2016-06-09T15:38:14.768201Z 0 [Note] InnoDB: Setting file './ibtmp1' size to 12 MB. Physically writing the file full; Please wait ...
2016-06-09T15:38:14.794327Z 0 [Note] InnoDB: File './ibtmp1' size is now 12 MB.
2016-06-09T15:38:14.795388Z 0 [Note] InnoDB: 96 redo rollback segment(s) found. 96 redo rollback segment(s) are active.
2016-06-09T15:38:14.795428Z 0 [Note] InnoDB: 32 non-redo rollback segment(s) are active.
2016-06-09T15:38:14.795826Z 0 [Note] InnoDB: Waiting for purge to start
2016-06-09T15:38:14.846166Z 0 [Note] InnoDB: 5.7.12 started; log sequence number 12164862
2016-06-09T15:38:14.846511Z 0 [Note] Plugin 'FEDERATED' is disabled.
2016-06-09T15:38:14.848709Z 0 [Note] InnoDB: Loading buffer pool(s) from /var/lib/mysql/ib_buffer_pool
2016-06-09T15:38:14.868821Z 0 [Note] InnoDB: Buffer pool(s) load completed at 160609 15:38:14
2016-06-09T15:38:14.875260Z 0 [Warning] Failed to set up SSL because of the following SSL library error: SSL context is not usable without certificate and private key
2016-06-09T15:38:14.875327Z 0 [Note] Server hostname (bind-address): '*'; port: 3306
2016-06-09T15:38:14.875375Z 0 [Note] IPv6 is available.
2016-06-09T15:38:14.875396Z 0 [Note]   - '::' resolves to '::';
2016-06-09T15:38:14.875423Z 0 [Note] Server socket created on IP: '::'.
2016-06-09T15:38:14.877831Z 0 [Warning] 'db' entry 'sys mysql.sys@localhost' ignored in --skip-name-resolve mode.
2016-06-09T15:38:14.877887Z 0 [Warning] 'proxies_priv' entry '@ root@localhost' ignored in --skip-name-resolve mode.
2016-06-09T15:38:14.879826Z 0 [Warning] 'tables_priv' entry 'sys_config mysql.sys@localhost' ignored in --skip-name-resolve mode.
2016-06-09T15:38:14.894606Z 0 [Note] Event Scheduler: Loaded 0 events
2016-06-09T15:38:14.895106Z 0 [Note] mysqld: ready for connections.
Version: '5.7.12-log'  socket: '/var/run/mysqld/mysqld.sock'  port: 3306  MySQL Community Server (GPL)

Notice that the MySQL server starts and stops a few times as the configuration is modified. The mysqld: ready for connections line reports that the MySQL server is running.

Start a MySQL command line client

Open a new terminal and configure it with the Docker environment. In that terminal, run the following to start a new container to run the MySQL command line client and connect it to the MySQL server running in the mysql container:

$ docker run -it --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

Here we start the container using the mysql:5.7 image, name the container mysqlterm and link it to the mysql container where the database server is running. The --rm option tells Docker to remove the container when it stops, and the rest of the command defines the shell command that the container should run. This shell command runs the MySQL command line client and specifies the correct options so that it can connect properly.

The container should output lines similar to the following:

mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 2

Copyright (c) 2000, 2016, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql>

Unlike the other containers, this container runs a process that produces a prompt. We’ll use the prompt to interact with the database. First, switch to the "inventory" database:

mysql> use inventory;

and then list the tables in the database:

mysql> show tables;

which should then display:

+---------------------+
| Tables_in_inventory |
+---------------------+
| customers           |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
4 rows in set (0.00 sec)

Use the MySQL command line client to explore the database and view the pre-loaded data in the database. For example:

mysql> SELECT * FROM customers;

Monitor the MySQL database

At this point we are running the Debezium services, a MySQL database server with a sample inventory database, and the MySQL command line client that is connected to our database. The next step is to register a connector that will begin monitoring the MySQL database server’s binlog and generate change events for each row that has been (or will be) changed. Since this is a new connector, when it starts it will start reading from the beginning of the MySQL binlog, which records all of the transactions, including individual row changes and changes to the schemas.

It is essential that the connector keep track of the schema changes, because each row change is recorded in the binlog in terms of the structure of its table at the time the row was changed. As our connector reads the binlog, the connector is actually replaying the history of the database and must keep track of the structure of each table to properly interpret the row changes. MySQL records in the binlog all DDL statements that change the database schema, so Debezium’s MySQL connector parses and uses these DDL statements to maintain an in-memory model of the structure of each table. It also records these DDL statements in a separate Kafka topic so that the connector can recover the structure of the database that existed at any point in time, as defined by the statements in the binlog.

So before we start the connector, we need to create that Kafka topic where the connector can write out the database’s schema history. We’ll use the debezium/kafka image to start a container that runs the Kafka utility to create a schema-changes.inventory topic.

Go back to your terminal where you ran the curl commands against the Kafka Connect service, and run the following to create the topic:

$ docker run -it --rm --link zookeeper:zookeeper debezium/kafka:0.2 create-topic -r 1 schema-changes.inventory

The command runs a container using version 0.6 of the debezium/kafka image, uses --rm to tell Docker to remove the container when it stops, and links to the Zookeeper container so that the utility can find the Kafka broker(s). The command runs the create-topic utility, which by default create a topic with one partition - exactly what we want so that total order of all DDL statements is maintained. The -r 1 argument specifies the topic should have 1 replica.

Normally we’d want 3 or more replicas so that we reduce the risk of losing data should brokers fail. But since we’re just running a single broker in our tutorial, we can only specify 1 replia.

You’ll see output similar to the following:

Creating new topic schema-changes.inventory with 1 partition(s) and 1 replica(s)...
Created topic "schema-changes.inventory".

The container exits as soon as the request to create the topic completes, and because --rm is used Docker will remove the container, too.

Now we’re ready to start our connector. Using the same terminal, we’ll use curl to submit to our Kafka Connect service a JSON request message with information about our connector:

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.100:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "192.168.99.100", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "mysql-server-1", "database.binlog": "mysql-bin.000001", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory" } }'

This command and several others use 192.168.99.100 as the IP address, which in my case is the IP address of the Docker host when using Docker Machine. If you’re using Docker Machine, use docker-machine ip to get the IP address of your Docker host. If you’re running Linux, get the IP address of your machine update the curl command to use your IP address.

This command uses the Kafka Connect service’s RESTful API to submit a POST request against /connectors resource with a JSON document that describes our new connector. Here’s the same JSON message in a more readable format:

{
    "name": "inventory-connector",
    "config": {
        "name": "inventory-connector",
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "192.168.99.100",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "mysql-server-1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
    }
}

The JSON message specifies the connector name as inventory-connector, and provides the detailed configuration properties for our MySQL connector:

  • Exactly one task should operate at any one time. Since the MySQL connect reads the MySQL server’s binlog, and using a single connector task is the only way to ensure the proper order and that all events are handled properly.

  • The database host and port are specified.

  • The MySQL database we’re running has a replicator user set up expressly for our purposes, so we specify that username and password here.

  • A unique server ID and name are given. The server name is the logical identifier for the MySQL server or cluster of servers, and will be used as the prefix for all Kafka topics.

  • The name of the initial binlog file is given. We start at the first file, but you can alternatively specify others.

  • We only want to detect changes in the inventory database, so we use a whitelist.

  • The connector should store the history of the database schemas in Kafka using the named broker (the same broker to which we’re sending events) and topic name. Upon restart, the connector will recover the schemas of the database(s) that existed at the point in time in the binlog when the connector should begin reading.

This command should produce a response similar to the following (perhaps a bit more compact):

HTTP/1.1 201 Created
Date: Thu, 09 Jun 2016 15:49:46 GMT
Location: http://192.168.99.100:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 534
Server: Jetty(9.2.12.v20150709)

{
    "name": "inventory-connector",
    "config":{
        "name": "inventory-connector",
        "connector.class":"io.debezium.connector.mysql.MySqlConnector",
        "tasks.max":"1",
        "database.hostname":"192.168.99.100",
        "database.port":"3306",
        "database.user":"debezium",
        "database.password":"dbz",
        "database.server.id":"184054",
        "database.server.name":"mysql-server-1",
        "database.whitelist":"inventory",
        "database.history.kafka.bootstrap.servers":"kafka:9092",
        "database.history.kafka.topic":"schema-changes.inventory"
    },
    "tasks":[]
}

This response describes the connector resource /connectors/inventory-connector that the service just created and includes the connector’s configuration and information about the tasks. Since the connector was just created, the service hasn’t yet finished starting tasks.

We can even use the RESTful API to verify that our connector is included in the list of connectors:

$ curl -H "Accept:application/json" 192.168.99.100:8083/connectors/

which should return the following:

["inventory-connector"]

Recall that the Kafka Connect service uses connectors to start one or more tasks that do the work, and that it will automatically distribute the running tasks across the cluster of Kafka Connect services. Should any of the services stop or crash, those tasks will be redistributed to running services. We can see the tasks when we get the state of the connector:

$ curl -i -X GET -H "Accept:application/json" 192.168.99.100:8083/connectors/inventory-connector

which returns:

HTTP/1.1 200 OK
Date: Thu, 09 Jun 2016 15:51:30 GMT
Content-Type: application/json
Content-Length: 578
Server: Jetty(9.2.12.v20150709)

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.user": "debezium",
    "database.server.id": "184054",
    "tasks.max": "1",
    "database.binlog": "mysql-bin.000001",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "database.server.name": "mysql-server-1",
    "database.port": "3306",
    "database.hostname": "192.168.99.100",
    "database.password": "dbz",
    "name": "inventory-connector",
    "database.whitelist": "inventory"
  },
  "tasks": [
    {
      "connector": "inventory-connector",
      "task": 0
    }
  ]
}

Here, we can see that the connector is running a single task (e.g., task 0) to do its work. The MySQL connector only supports a single task. After all, MySQL records all of its activities in one binlog, and so the MySQL connector can have at most one reader to get a consistent and totally ordered view of all of those events.

If we look at the output of our connect container, we should now see lines similar to the following

....
2016-06-09 16:56:51,811 INFO   MySQL|mysql-server-1|task  Source task Thread[WorkerSourceTask-inventory-connector-0,5,main] finished initialization and start   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2016-06-09 16:56:51,815 INFO   MySQL|mysql-server-1|snapshot  Starting snapshot   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:51,815 INFO   MySQL|mysql-server-1|snapshot  Step 0: disabling autocommit and enabling repeatable read transactions   [io.debezium.connector.mysql.SnapshotReader]
Thu Jun 09 16:56:52 UTC 2016 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2016-06-09 16:56:52,191 INFO   MySQL|mysql-server-1|snapshot  Step 1: start transaction with consistent snapshot   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,192 INFO   MySQL|mysql-server-1|snapshot  Step 2: flush and obtain global read lock (preventing writes to database)   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,193 INFO   MySQL|mysql-server-1|snapshot  Step 3: read binlog position of MySQL master   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,195 INFO   MySQL|mysql-server-1|snapshot  Step 4: read list of available databases   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,195 INFO   MySQL|mysql-server-1|snapshot  Step 5: read list of available tables in each database   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,215 INFO   MySQL|mysql-server-1|snapshot  Step 6: generating DROP and CREATE statements to reflect current database schemas   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,361 INFO   MySQL|mysql-server-1|snapshot  Step 7: releasing global read lock to enable MySQL writes   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,368 INFO   MySQL|mysql-server-1|snapshot  Writes to MySQL prevented for a total of 00:00:00.176   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,369 INFO   MySQL|mysql-server-1|snapshot  Step 8: scanning contents of 4 tables   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,382 INFO   MySQL|mysql-server-1|snapshot  Step 8.1: scanned table 'inventory.customers' in 00:00:00.013   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,390 INFO   MySQL|mysql-server-1|snapshot  Step 8.2: scanned table 'inventory.orders' in 00:00:00.007   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,392 INFO   MySQL|mysql-server-1|snapshot  Step 8.3: scanned table 'inventory.products' in 00:00:00.002   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,394 INFO   MySQL|mysql-server-1|snapshot  Step 8.4: scanned table 'inventory.products_on_hand' in 00:00:00.001   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,394 INFO   MySQL|mysql-server-1|snapshot  Step 8: scanned contents of 4 tables in 00:00:00.025   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,394 INFO   MySQL|mysql-server-1|snapshot  Step 10: committing transaction   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,394 INFO   MySQL|mysql-server-1|snapshot  Step 11: recording completion of snapshot   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,397 INFO   MySQL|mysql-server-1|snapshot  Completed snapshot in 00:00:00.582   [io.debezium.connector.mysql.SnapshotReader]
2016-06-09 16:56:52,838 WARN   ||  Error while fetching metadata with correlation id 0 : {mysql-server-1=LEADER_NOT_AVAILABLE}   [org.apache.kafka.clients.NetworkClient]
2016-06-09 16:56:53,063 WARN   ||  Error while fetching metadata with correlation id 3 : {mysql-server-1.inventory.customers=LEADER_NOT_AVAILABLE}   [org.apache.kafka.clients.NetworkClient]
2016-06-09 16:56:53,281 WARN   ||  Error while fetching metadata with correlation id 7 : {mysql-server-1.inventory.orders=LEADER_NOT_AVAILABLE}   [org.apache.kafka.clients.NetworkClient]
2016-06-09 16:56:53,506 WARN   ||  Error while fetching metadata with correlation id 10 : {mysql-server-1.inventory.products=LEADER_NOT_AVAILABLE}   [org.apache.kafka.clients.NetworkClient]
2016-06-09 16:56:53,721 WARN   ||  Error while fetching metadata with correlation id 14 : {mysql-server-1.inventory.products_on_hand=LEADER_NOT_AVAILABLE}   [org.apache.kafka.clients.NetworkClient]
Jun 09, 2016 4:56:53 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to 192.168.99.100:3306 at mysql-bin.000003/154 (sid:184054, cid:5)
2016-06-09 16:56:53,947 INFO   MySQL|mysql-server-1|binlog  Connected to MySQL binlog at 192.168.99.100:3306, starting at binlog file 'mysql-bin.000003', pos=154, row=0   [io.debezium.connector.mysql.BinlogReader]
...

Let’s look into this output in more detail. First, Debezium improves the log messages and makes use of mapped diagnostic contexts, or MDC, which allow the log messages to include thread-specific information like the connector type (e.g., MySQL in the above log messages after "INFO" or "WARN" fields), the logical name of the connector (e.g., mysql-server-1 above), and the connector’s activity (e.g., snapshot and binlog). Hopefully these will make it easier to understand what is going on in the multi-threaded Kafka Connect service.

Now, if we look at these log statements, we can see that the connector starts, performs a consistent snapshot with 11 steps, and then starts reading the binlog at the same point where the snapshot was taken. Since our inventory database is quite small, the snapshot process goes quite quickly: 0.582 seconds as shown in one of the log messages above. This may take longer with larger databases, but the log messages do describe which of the 11 steps are performed with a global read lock on the MySQL server. (See the MySQL connector documentation for more details.)

After the snapshot completes, the MySQL connector will generally output very little information using INFO or WARN level messages.

There’s one more thing in these log messages to mention. The five warning log messages near the end of the sample output above sound ominous, but are basically telling us that new Kafka topics were created and Kafka had to assign a new leader. Note the names of the topics:

  • mysql-server-1.inventory.products

  • mysql-server-1.inventory.products_on_hand

  • mysql-server-1.inventory.customers

  • mysql-server-1.inventory.orders

As described in the MySQL connector documentation, each topic names start with mysql-server-1, which is the logical name we gave our connector. Each topic name also includes inventory, which is the name of the database. Finally, each topic name concludes with the name of one of the tables in the inventory database. In other words, all of the data change events describing rows in the each table appear in separate topics.

Let’s look at all of the data change events in the mysql-server-1.inventory.customers topic. Again, we’ll use the debezium/kafka Docker image to start a new container that connects to Kafka to watch the topic from the beginning of the topic:

$ docker run -it --name watcher --rm --link zookeeper:zookeeper debezium/kafka:0.2 watch-topic -a -k mysql-server-1.inventory.customers

Again, we use the --rm flag since we want the container to be removed when it stops, and we use the -a flag on watch-topic to signal that we want to see all events since the beginning of the topic. (If we were to remove the -a flag, we’d see only the events that are recorded in the topic after we start watching.) The -k flag specifies that the output should include the event’s key, which in our case contains the row’s primary key. Here’s the output:

...
Contents of topic mysql-server-1.inventory.customers:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"mysql-server-1.inventory.customers.Key"},"payload":{"id":1001}}   {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"mysql-server-1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"name":"mysql-server-1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true},"op":"c","ts_ms":1465580847054}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"mysql-server-1.inventory.customers.Key"},"payload":{"id":1002}}   {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"mysql-server-1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"name":"mysql-server-1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true},"op":"c","ts_ms":1465580847054}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"mysql-server-1.inventory.customers.Key"},"payload":{"id":1003}}   {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"mysql-server-1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"name":"mysql-server-1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true},"op":"c","ts_ms":1465580847054}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"mysql-server-1.inventory.customers.Key"},"payload":{"id":1004}}   {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"mysql-server-1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"name":"mysql-server-1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true},"op":"c","ts_ms":1465580847054}}

This utility keeps watching, so any new events would automatically appear as long as the utility keeps running. And this watch-topic utility is very simple and is limited in functionality and usefulness - we use it here simply to get an understanding of the kind of events that our connector generates. Applications that want to consume events would instead use Kafka consumers, and those consumer libraries offer far more flexibility and power. In fact, properly configured clients enable our applications to never miss any events, even when those applications crash or shutdown gracefullly.

These events happen to be encoded in JSON, since that’s how we configured our Kafka Connect service. Each event includes one JSON document for the key, and one for the value. Let’s look at the last event in more detail, by first reformatting the event’s key to be easier to read:

{
  "schema": {
    "type": "struct",
    "name": "mysql-server-1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

The event’s key has two parts: a schema and payload. The schema contains a Kafka Connect schema describing what is in the payload, and in our case that means that the payload is a struct named mysql-server-1.inventory.customers.Key that is not optional and has one required field named id of type int32.

If we look at the value of the key’s payload field, we’ll see that it is indeed a structure (which in JSON is just an object) with a single id field, whose value is 1004.

Therefore, we interpret this event as applying to the row in the inventory.customers table (output from the connector named mysql-server-1) whose id primary key column had a value of 1004.

Now let’s look at the same event’s value, which again we reformat to be easier to read:

{
    "schema": {
      "type": "struct",
      "optional": false,
      "name": "mysql-server-1.inventory.customers.Envelope",
      "version": 1,
      "fields": [
        {
          "field": "op",
          "type": "string",
          "optional": false
        },
        {
          "field": "before",
          "type": "struct",
          "optional": true,
          "name": "mysql-server-1.inventory.customers.Value",
          "fields": [
            {
              "type": "int32",
              "optional": false,
              "field": "id"
            },
            {
              "type": "string",
              "optional": false,
              "field": "first_name"
            },
            {
              "type": "string",
              "optional": false,
              "field": "last_name"
            },
            {
              "type": "string",
              "optional": false,
              "field": "email"
            }
          ]
        },
        {
          "field": "after",
          "type": "struct",
          "name": "mysql-server-1.inventory.customers.Value",
          "optional": true,
          "fields": [
            {
              "type": "int32",
              "optional": false,
              "field": "id"
            },
            {
              "type": "string",
              "optional": false,
              "field": "first_name"
            },
            {
              "type": "string",
              "optional": false,
              "field": "last_name"
            },
            {
              "type": "string",
              "optional": false,
              "field": "email"
            }
          ]
        },
        {
          "field": "source",
          "type": "struct",
          "name": "io.debezium.connector.mysql.Source",
          "optional": false,
          "fields": [
            {
              "type": "string",
              "optional": false,
              "field": "name"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "server_id"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "ts_sec"
            },
            {
              "type": "string",
              "optional": true,
              "field": "gtid"
            },
            {
              "type": "string",
              "optional": false,
              "field": "file"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "pos"
            },
            {
              "type": "int32",
              "optional": false,
              "field": "row"
            },
            {
              "type": "boolean",
              "optional": true,
              "field": "snapshot"
            }
          ]
        },
        {
          "field": "ts_ms",
          "type": "int64",
          "optional": true
        }
      ]
    },
    "payload": {
      "before": null,
      "after": {
        "id": 1004,
        "first_name": "Anne",
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org"
      },
      "source": {
        "name": "mysql-server-1",
        "server_id": 0,
        "ts_sec": 0,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 154,
        "row": 0,
        "snapshot": true
      },
      "op": "c",
      "ts_ms": 1465491411815
    }
  }

This portion of the event is much larger, but like the event’s key this, too, has a schema and a payload. The schema contains a Kafka Connect schema named mysql-server-1.inventory.customers.Envelope (version 1) that can contain 5 fields:

  • op is a mandatory field that contains a string value describing the type of operation. Values for the MySQL connector are c for create (or insert), u for update, d for delete, and r for read (in the case of a non-initial snapshot).

  • before is an optional field that if present contains the state of the row before the event occurred. The structure will be described by the mysql-server-1.inventory.customers.Value Kafka Connect schema, which the mysql-server-1 connector uses for all rows in the inventory.customers table.

  • after is an optional field that if present contains the state of the row after the event occurred. The structure is describe by the same mysql-server-1.inventory.customers.Value Kafka Connect schema used in before.

  • source is a mandatory field that conains a structure describing the source metadata for the event, which in the case of MySQL contains several fields: the connector name, the name of the binlog file where the event was recorded, the position in that binlog file where the event appeared, the row within the event (if there is more than one), whether this event was part of a snapshot, and if available the MySQL server ID, and the timestamp in seconds.

  • ts_ms is optional and if present contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.

If we look at the payload of the event’s value, we can see the information in the event, namely that it is describing that the row was created, contains the id, first_name, last_name, and email of the inserted row.

You may have noticed that the JSON representations of the events are much larger than the rows they describe. This is because Kafka Connect ships with every event key and value the schema that describes the payload. Over time, this structure may change, and having the schemas for the key and value in the event itself makes it much easier for consuming applications to understand the messages, especially as they evolve over time.

The Debezium MySQL connector constructs these schemas based upon the structure of the database tables. If you use DDL statements to alter the table definitions in the MySQL databases, the connector reads these DDL statements and updates its Kafka Connect schemas. This is the only way that each event is structured exactly like the table from where it originated at the time the event occurred. But the Kafka topic containing all of the events for a single table might have events that correspond to each state of the table definition.

The JSON converter does produce very verbose events since it includes the key and value schemas in every message. The Avro converter, on the other hand, is far smarter and results in far smaller event messages. The Avro converter transforms each Kafka Connect schema into an Avro schema and stores the Avro schemas in a separate Schema Registry service. Thus when the Avro converter serializes an event message, it places only an unique identifier for the schema along with an Avro-encoded binary representation of the value. Thus, the serialized messages transferred over the wire and stored in Kafka are far smaller than they appear above. In fact, the Avro Converter is able to use Avro schema evolution techniques to maintain the history of each schema in the Schema Registry.

We can compare these to the state of the database. Go back to the terminal that is running the MySQL command line client, and run the following statement:

mysql> SELECT * FROM customers;

which produces the following output:

+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)

As we can see, all of our event records match the database.

Now that we’re monitoring changes, what happens when we change one of the records in the database? Run the following statement in the MySQL command line client:

mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;

which produces the following output:

Query OK, 1 row affected (0.05 sec)
Rows matched: 1  Changed: 1  Warnings: 0

Rerun the select …​ statement to see the updated table:

mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne Marie | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)

Now, go back to the terminal running watch-topic and we should see a new fifth event:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"mysql-server-1.inventory.customers.Key"},"payload":{"id":1004}}   {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"mysql-server-1.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"name":"mysql-server-1","server_id":223344,"ts_sec":1465581,"gtid":null,"file":"mysql-bin.000003","pos":484,"row":0,"snapshot":null},"op":"u","ts_ms":1465581029523}}

Let’s reformat the new event’s key to be easier to read:

{
  "schema": {
    "type": "struct",
    "name": "mysql-server-1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

This key is exactly the same key as what we saw in the fourth record. Here’s that new event’s value formatted to be easier to read:

{
    "schema": {
      "type": "struct",
      "optional": false,
      "name": "mysql-server-1.inventory.customers.Envelope",
      "version": 1,
      "fields": [
        {
          "field": "op",
          "type": "string",
          "optional": false
        },
        {
          "field": "before",
          "type": "struct",
          "optional": true,
          "name": "mysql-server-1.inventory.customers.Value",
          "fields": [
            {
              "type": "int32",
              "optional": false,
              "field": "id"
            },
            {
              "type": "string",
              "optional": false,
              "field": "first_name"
            },
            {
              "type": "string",
              "optional": false,
              "field": "last_name"
            },
            {
              "type": "string",
              "optional": false,
              "field": "email"
            }
          ]
        },
        {
          "field": "after",
          "type": "struct",
          "name": "mysql-server-1.inventory.customers.Value",
          "optional": true,
          "fields": [
            {
              "type": "int32",
              "optional": false,
              "field": "id"
            },
            {
              "type": "string",
              "optional": false,
              "field": "first_name"
            },
            {
              "type": "string",
              "optional": false,
              "field": "last_name"
            },
            {
              "type": "string",
              "optional": false,
              "field": "email"
            }
          ]
        },
        {
          "field": "source",
          "type": "struct",
          "name": "io.debezium.connector.mysql.Source",
          "optional": false,
          "fields": [
            {
              "type": "string",
              "optional": false,
              "field": "name"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "server_id"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "ts_sec"
            },
            {
              "type": "string",
              "optional": true,
              "field": "gtid"
            },
            {
              "type": "string",
              "optional": false,
              "field": "file"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "pos"
            },
            {
              "type": "int32",
              "optional": false,
              "field": "row"
            },
            {
              "type": "boolean",
              "optional": true,
              "field": "snapshot"
            }
          ]
        },
        {
          "field": "ts_ms",
          "type": "int64",
          "optional": true
        }
      ]
    },
    "payload": {
      "before": {
        "id": 1004,
        "first_name": "Anne",
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org"
      },
      "after": {
        "id": 1004,
        "first_name": "Anne Marie",
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org"
      },
      "source": {
        "name": "mysql-server-1",
        "server_id": 223344,
        "ts_sec": 1465581,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 484,
        "row": 0,
        "snapshot": null
      },
      "op": "u",
      "ts_ms": 1465581029523
    }

When we compare this to the value in the fourth event, we see no changes in the schema section and a couple of changes in the payload section:

  • The op field value is now u, signifying that this row changed because of an update

  • The before field now has the state of the row with the values before the database commit

  • The after field now has the updated state of the row, and here was can see that the first_name value is now Anne Marie.

  • The source field structure has many of the same values as before, except the ts_sec and pos fields have changed (and the file might have changed in other circumstances).

  • The ts_ms shows the timestamp that Debezium processed this event.

There are several things we can learn by just looking at this payload section. We can compare the before and after structures to determine what actually changed in this row because of the commit. The source structure tells us information about MySQL’s record of this change (providing traceability), but more importantly this has information we can compare to other events in this and other topics to know whether this event occurred before, after, or as part of the same MySQL commit as other events.

So far we’ve seen samples of create and update events. Now, let’s look at delete events. Since Anne Marie has not placed any orders, we can remove her record from our database using the MySQL command line client:

mysql> DELETE FROM customers WHERE id=1004;

In our terminal running watch-topic, we see two new events:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"mysql-server-1.inventory.customers.Key"},"payload":{"id":1004}}   {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"mysql-server-1.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":null,"source":{"name":"mysql-server-1","server_id":223344,"ts_sec":1465581,"gtid":null,"file":"mysql-bin.000003","pos":805,"row":0,"snapshot":null},"op":"d","ts_ms":1465581902461}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"mysql-server-1.inventory.customers.Key"},"payload":{"id":1004}}   {"schema":null,"payload":null}

What happened? We only deleted one row, but we now have two events. To understand what the MySQL connector does, let’s look at the first of our two new messages. Here’s the key reformatted to be easier to read:

{
  "schema": {
    "type": "struct",
    "name": "mysql-server-1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

Once again, this key is exactly the same key as in the previous two events we looked at. Here’s the value of the first new event, formatted to be easier to read:

{
    "schema": {
      "type": "struct",
      "optional": false,
      "name": "mysql-server-1.inventory.customers.Envelope",
      "version": 1,
      "fields": [
        {
          "field": "op",
          "type": "string",
          "optional": false
        },
        {
          "field": "before",
          "type": "struct",
          "optional": true,
          "name": "mysql-server-1.inventory.customers.Value",
          "fields": [
            {
              "type": "int32",
              "optional": false,
              "field": "id"
            },
            {
              "type": "string",
              "optional": false,
              "field": "first_name"
            },
            {
              "type": "string",
              "optional": false,
              "field": "last_name"
            },
            {
              "type": "string",
              "optional": false,
              "field": "email"
            }
          ]
        },
        {
          "field": "after",
          "type": "struct",
          "name": "mysql-server-1.inventory.customers.Value",
          "optional": true,
          "fields": [
            {
              "type": "int32",
              "optional": false,
              "field": "id"
            },
            {
              "type": "string",
              "optional": false,
              "field": "first_name"
            },
            {
              "type": "string",
              "optional": false,
              "field": "last_name"
            },
            {
              "type": "string",
              "optional": false,
              "field": "email"
            }
          ]
        },
        {
          "field": "source",
          "type": "struct",
          "name": "io.debezium.connector.mysql.Source",
          "optional": false,
          "fields": [
            {
              "type": "string",
              "optional": false,
              "field": "name"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "server_id"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "ts_sec"
            },
            {
              "type": "string",
              "optional": true,
              "field": "gtid"
            },
            {
              "type": "string",
              "optional": false,
              "field": "file"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "pos"
            },
            {
              "type": "int32",
              "optional": false,
              "field": "row"
            },
            {
              "type": "boolean",
              "optional": true,
              "field": "snapshot"
            }
          ]
        },
        {
          "field": "ts_ms",
          "type": "int64",
          "optional": true
        }
      ]
    },
    "payload": {
      "before": {
        "id": 1004,
        "first_name": "Anne Marie",
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org"
      },
      "after": null,
      "source": {
        "name": "mysql-server-1",
        "server_id": 223344,
        "ts_sec": 1465581,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 805,
        "row": 0,
        "snapshot": null
      },
      "op": "d",
      "ts_ms": 1465581902461
    }

Here we see a few things:

  • The op field value is now d, signifying that this row was deleted

  • The before field now has the state of the row that was deleted with the database commit

  • The after field is null, signifying that the row no longer exists

  • The source field structure has many of the same values as before, except the ts_sec and pos fields have changed (and the file might have changed in other circumstances).

  • The ts_ms shows the timestamp that Debezium processed this event.

This event gives a consumer all kinds of information that it can use to process the removal of this row. We include the old values because some consumers might require them in order to properly handle the removal, and without it they may have to resort to far more complex behavior.

Remember that we saw two events when we deleted the row? Let’s look at that second event. Here’s the key for the event:

{
  "schema": {
    "type": "struct",
    "name": "mysql-server-1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

Once again, this key is exactly the same key as in the previous three events we looked at. Here’s the value of that same event:

{
  "schema": null,
  "payload": null
}

What gives? Well, all of the Kafka topics that the MySQL connector writes to can be set up to be log compacted, which means that Kafka can remove older messages from the topic as long as there is at least one message later in the topic with the exact same key. This is Kafka’s way to collect the garbage. This last event is what Debezium calls a tombstone event, and because it has a key and an empty value Kafka understands it can remove all prior messages with this same key.

Kafka log compaction is great, because it still allows consumers to read the topic from the very beginning and not miss any events.

Restart the Kafka Connect service

One feature of the Kafka Connect service is that it automatically manages tasks for the registered connectors. And, because it stores its data in Kafka, if a running service stops or goes away completely, upon restart (perhaps on another host) the server will start any non-running tasks. To demostrate this, let’s stop our Kafka Connect service, change some data in the database, and restart our service.

In a new terminal, use the following Docker commands to stop and remove the connect container that is running our Kafka Connect service:

$ docker stop connect
$ docker rm connect

Stopping the container like this stops the process running inside of it, but the Kafka Connect service handles this by gracefully shutting down. Removing the container ensures that we won’t simply restart the container.

While the service is down, let’s go back to the MySQL command line client and add a few records:

mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");

Notice that in the terminal where we’re running watch-topic, there’s been no update. Also, we’re still able to watch the topic because Kafka is still running. (In a production system, you would have enough brokers to handle the producers and consumers, and to maintain a minimum number of in sync replicas for each topic. So if enough brokers fail such that there are not the minimum number of ISRs, Kafka should become unavailable. Producers, like the Debezium connectors, and consumers will simply wait patiently for the Kafka cluster or network to recover. Yes, that means that your consumers might temporarily see no change events as data is changed in the databases, but that’s because none are being produced. As soon as the Kafka cluster is restarted or the network recovers, Debezium will continue producing change events while your consumers will continue consuming events where they left off.)

Now, in a new terminal, start a new container using the same command we used before:

$ docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3  -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link kafka:kafka debezium/connect:0.2

This creates a whole new container, and since we’ve intialized it with the same topic information the new service can connect to Kafka, read the previous service’s configuration and start the registered connectors, which will continue where they last left off.

Jump back to the terminal running watch-topic, and you should now see two new records we added to the MySQL database:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"mysql-server-1.inventory.customers.Key"},"payload":{"id":1005}}   {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"mysql-server-1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Sarah","last_name":"Thompson","email":"kitt@acme.com"},"source":{"name":"mysql-server-1","server_id":223344,"ts_sec":1465583,"gtid":null,"file":"mysql-bin.000003","pos":1115,"row":0,"snapshot":null},"op":"c","ts_ms":1465583022619}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"mysql-server-1.inventory.customers.Key"},"payload":{"id":1006}}   {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"mysql-server-1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"mysql-server-1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1006,"first_name":"Kenneth","last_name":"Anderson","email":"kander@acme.com"},"source":{"name":"mysql-server-1","server_id":223344,"ts_sec":1465583,"gtid":null,"file":"mysql-bin.000003","pos":1429,"row":0,"snapshot":null},"op":"c","ts_ms":1465583031183}}

These events are create events that are similar to what we saw before. The important point to understand, though, is that Debezium will still report all of the changes in a database even when it is not running, as long as it is restarted before the MySQL database starts purging those commits we missed from its binlog.

Exploration

Go ahead and use the MySQL command line client to add, modify, and remove rows to the database tables, and see the effect on the topics. You may need to start multiple watch-topic commands for each topic. And remember that you can’t remove a row that is referenced by a foreign key. Have fun!

Clean up

You can use Docker to stop and remove all of the running containers:

$ docker stop mysqlterm watcher connect mysql kafka zookeeper
$ docker rm connect mysql kafka zookeeper

Then, verify that all of the other processes are stopped:

$ docker ps -a

You can stop any of them using docker stop <name> or docker stop <containerId>.

back to top