Subscribe


Serializing Debezium events with Avro

Although Debezium makes it easy to capture database changes and record them in Kafka, one of the more important decisions you have to make is how those change events will be serialized in Kafka. Every message in Kafka has a key and a value, and to Kafka these are opaque byte arrays. But when you set up Kafka Connect, you have to say how the Debezium event keys and values should be serialized to a binary form, and your consumers will also have to deserialize them back into a usable form.

Debezium event keys and values are both structured, so JSON is certainly a reasonable option — it’s flexible, ubiquitous, and language agnostic, but on the other hand it’s quite verbose. One alternative is Avro, which is also flexible and language agnostic, but also faster and results in smaller binary representations. Using Avro requires a bit more setup effort on your part and some additional software, but the advantages are often worth it.

Kafka serializers and deserializers

Before we get too far, let’s back up and review how Kafka producers and consumers normally do this serialization and deserialization. Because the keys and values are simple opaque byte arrays, you can use anything for your keys and values. For example, consider a case where we’re using simple whole numbers for the keys and strings for the values. Here, a producer of these messages would use a long serializer to convert the long keys to binary form and a string serializer to convert the String values to binary form. Meanwhile, the consumers use a long deserializer to convert the binary keys into usable long values, and a string deserializer to convert the binary values back into String objects.

In cases where the keys and/or values need to be a bit more structured, the producers and consumers can be written to use JSON structures for keys and/or values, and the Kafka-provided JSON serializer and deserializer to do the conversion to and from binary form stored within the Kafka messages. As we said earlier, using JSON for keys and/or values is very flexible and language agnostic, but it is also produces keys and values that are relatively large since the fields and structure of the JSON values need to be encoded as well.

Avro serialization

Avro is a data serialization mechanism that uses a schema to define the structure of data. Avro relies upon this schema when writing the data to the binary format, and the schema allows it to encode the fields within the data in a much more compact form. Avro also relies upon the schema when reading the data, too. But interestingly, Avro schemas are designed to evolve, so it is actually possible to use a slightly different schema for reading than what was used for writing. This feature makes Avro a great choice for Kafka serialization and deserialization.

Confluent provides a Kafka serializer and deserializer that uses Avro and a separate Schema Registry, and it works like this: when a numeric or string object are to be serialized, the Avro serializer will determine the corresponding Avro Schema for the given type, register with the Schema Registry this schema and the topic its used on, get back the unique identifier for the schema, and then encode in the binary form the unique identifier of the schema and the encoded value. The next message is likely to have the same type and thus schema, so the serializer can quickly encode the schema identifier and value for this message without having to talk to the Schema Registry. Only when needing to serialize a schema it hasn’t already seen does the Avro serializer talk with the Schema Registry. So not only is this fast, but it also produces very compact binary forms and allows for the producer to evolve its key and/or value schemas over time. The Schema Registry can also be configured to allow new versions of schemas to be registered only when they are compatible with the Avro schema evolution rules, ensuring that producers do not produce messages that consumers will not be able to read.

Consumers, meanwhile, use the Avro deserializer, which works in a similar manner, albeit backwards: when it reads the binary form of a key or value, it first looks for the schema identifier and, if it hasn’t seen it before asks the Schema Registry for the schema, and then uses that schema to decode the remainder of the binary representation into its object form. Again, if the deserializer has previously seen a particular schema identifier, it already has the schema needed to decode the data and doesn’t have to consult the Schema Registry.

Kafka Connect converters

Kafka Connect is a bit different than many Kafka producers/consumers, since the keys and values will often be structured. And rather than require connectors to work with JSON objects, Kafka Connect defines its own lightweight framework for defining data structures with a schema, making it much easier to write connectors to work with structured data. Kafka Connect defines its own converters that are similar to Kafka (de)serializers, except that Kafka Connect’s converters know about these structures and schemas and can serialize the keys and values to binary form. Kafka Connect provides a JSON converter that converts the structures into JSON and then uses the normal Kafka JSON serializer, so downstream consumers can just use the normal Kafka JSON deserializer and get a JSON representation of the Kafka Connect structs and schema. This is exactly what the Debezium tutorial is using, and the watch-topic consumer knows to use the JSON deserializer.

One great feature of Kafka Connect is that the connectors simply provide the structured messages, and Kafka Connect takes care of serializing them using the configured converter. This means that you can use any Kafka Connect converters with any Kafka Connect connector, including all of Debezium’s connectors.

Kafka Connect’s schema system was designed specifically with Avro in mind, so there is a one-to-one mapping between Kafka Connect schemas and Avro schemas. Confluent provides an Avro Converter for Kafka Connect that serializes the Kafka Connect structs provided by the connectors into the compact Avro binary representation, again using the Schema Registry just like the Avro serializer. The consumer just uses the normal Avro deserializer as mentioned above.

Using Avro for serialization of Debezium events brings several significant advantages:

  1. The encoded binary forms of the Debezium events are significantly smaller than the JSON representations. Not only is the structured data encoded in a more compact form, but the schema associated with that structured data is represented in the binary form as a single integer.

  2. Encoding the Debezium events into their Avro binary forms is fast. Only when the converter sees a new schema does it have to consult with the Schema Registry; otherwise, the schema has already been seen and its encoding logic already precomputed.

  3. The Avro Converter for Kafka Connect produces messages with Avro-encoded keys and values that can be read by any Kafka consumers using the Avro deserializer.

  4. Debezium event structures are based upon the structure of the table from which the changes were captured. When the structure of the source table changes (e.g., because an ALTER statement was applied to it), the structure and schema of the events will also change. If this is done in a manner such that the new Avro schema is compatible with the older Avro schema, then consumers will be able to process the events without disruption, even though the event structures evolve over time.

  5. Avro’s schema mechanism is far more formal and rigorous than the free-form JSON structure, and the changes in the schemas are clearly identified when comparing any two messages.

  6. The Avro converter, Avro (de)serializers, and Schema Registry are all open source.

It is true that using the Avro converter and deserializer requires a running Schema Registry, and that the registry becomes an integral part of your streaming infrastructure. However, this is a small price to pay for the benefits listed above.

Using the Avro Converter with Debezium

As mentioned above, in the interest of keeping the Debezium tutorial as simple as possible, we avoid using the Schema Registry or the Avro converter in the tutorial. We also don’t (yet) include the Avro converter in our Docker images, though that will change soon.

Nevertheless, it is absolutely possible to use the Avro Converter with the Debezium connectors when you are installing the connectors into either the Confluent Platform or into your own installation of Kafka Connect. Simply configure the Kafka Connect workers to use the Avro converter for the keys and values:

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter

And, if you want to use the Avro Converter for Kafka Connect internal messages, then set these as well:

internal.key.converter=io.confluent.connect.avro.AvroConverter
internal.value.converter=io.confluent.connect.avro.AvroConverter

Once again, there is no need to configure the Debezium connectors any differently.


Debezium on Kubernetes

Our Debezium Tutorial walks you step by step through using Debezium by installing, starting, and linking together all of the Docker containers running on a single host machine. Of course, you can use things like Docker Compose or your own scripts to make this easier, although that would just automating running all the containers on a single machine. What you really want is to run the containers on a cluster of machines. In this blog, we’ll run Debezium using a container cluster manager from Red Hat and Google called Kubernetes.

Kubernetes is a container (Docker/Rocket/Hyper.sh) cluster management tool. Like many other popular cluster management and compute resource scheduling platforms, Kubernetes' roots are in Google, who is no stranger to running containers at scale. They start, stop, and cluster 2 billion containers per week and they contributed a lot of the Linux kernel underpinnings that make containers possible. One of their famous papers talks about an internal cluster manager named Borg. With Kubernetes, Google got tired of everyone implementing their papers in Java so they decided to implement this one themselves :)

Kubernetes is written in Go-lang and is quickly becoming the de-facto API for scheduling, managing, and clustering containers at scale. This blog isn’t intended to be a primer on Kubernetes, so we recommend heading over to the Getting Started docs to learn more about Kubernetes.

Getting started

To get started, we need to have access to a Kubernetes cluster. Getting one started is pretty easy: just follow the getting started guides. A favorite of ours is OpenShift’s all in one VM or the Red Hat Container Development Kit which provide a hardened, production-ready distribution of Kubernetes. Once you’ve installed it and logged in, you should be able to run kubectl get pod to get a list of Kubernetes pods you may have running. You don’t need anything running else inside Kubernetes to get started.

To get and build the Kubernetes manifest files (yaml descriptors), go clone the Debezium Kubernetes repo and run the following command:

$ mvn clean
$ mvn install

This project uses the awesome Fabric8 Maven plugin to automatically generate the Kubernetes manifest files. Here’s an example of what gets generated in $PROJECT_ROOT/zk-standalone/target/classes/kubernetes.yml

apiVersion: "v1"
items:
- apiVersion: "v1"
  kind: "Service"
  metadata:
    annotations: {}
    labels:
      project: "zookeeper"
      provider: "debezium"
      version: "0.1-SNAPSHOT"
      group: "io.debezium"
    name: "zookeeper"
  spec:
    deprecatedPublicIPs: []
    externalIPs: []
    ports:
    - port: 2181
      protocol: "TCP"
      targetPort: 2181
    selector:
      project: "zookeeper"
      provider: "debezium"
      group: "io.debezium"
- apiVersion: "v1"
  kind: "ReplicationController"
  metadata:
    annotations:
      fabric8.io/git-branch: "master"
      fabric8.io/git-commit: "004e222462749fbaf12c3ee33edca9b077ee9003"
    labels:
      project: "zookeeper"
      provider: "debezium"
      version: "0.1-SNAPSHOT"
      group: "io.debezium"
    name: "zk-standalone"
  spec:
    replicas: 1
    selector:
      project: "zookeeper"
      provider: "debezium"
      version: "0.1-SNAPSHOT"
      group: "io.debezium"
    template:
      metadata:
        annotations: {}
        labels:
          project: "zookeeper"
          provider: "debezium"
          version: "0.1-SNAPSHOT"
          group: "io.debezium"
      spec:
        containers:
        - args: []
          command: []
          env:
          - name: "KUBERNETES_NAMESPACE"
            valueFrom:
              fieldRef:
                fieldPath: "metadata.namespace"
          image: "docker.io/debezium/zookeeper:0.1"
          imagePullPolicy: "IfNotPresent"
          name: "zk-standalone"
          ports:
          - containerPort: 3888
            name: "election"
          - containerPort: 2888
            name: "peer"
          - containerPort: 2181
            name: "client"
          securityContext: {}
          volumeMounts: []
        imagePullSecrets: []
        nodeSelector: {}
        volumes: []
kind: "List"

Starting Zookeeper and Kafka on Kubernetes

To start Apache Zookeeper or Apache Kafka inside Kubernetes you have two options. If you have the kubectl command line (or the oc tool from the OpenShift client distros) on your machine you can apply any of the newly generated Kubernetes manifest files like this:

$ kubectl create -f <path_to_file>

Or you can use the Fabric8 Maven plugin and its fabric8:apply goal to apply the manifest files. Note for either of these two options to work, you must be currently logged into your Kubernetes cluster. (Also, OpenShift’s oc login <url> makes this super easy, or see Logging into a Kubernetes Cluster with kubectl for more information.)

First, let’s deploy Zookeeper to our Kubernetes cluster. We need to be in $PROJECT_ROOT/zk-standalone directory, and then we’ll apply our Kubernetes configuration. First, let’s see how to do this with the kubectl command:

$ cd zk-standalone
$ kubectl create -f target/classes/kubernetes.yml

service "zookeeper" created
replicationcontroller "zk-standalone" created

You can do the same thing with Maven and the fabric8 maven plugin:

$ cd zk-standalone
$ mvn fabric8:apply

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building zk-standalone 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ zk-standalone ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/zk-standalone/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name zookeeper
[INFO] Created Service: zk-standalone/target/fabric8/applyJson/ticket/service-zookeeper.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name zk-standalone
[INFO] Created ReplicationController: zk-standalone/target/fabric8/applyJson/ticket/replicationcontroller-zk-standalone.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.661 s
[INFO] Finished at: 2016-05-19T15:59:26-07:00
[INFO] Final Memory: 26M/260M
[INFO] ------------------------------------------------------------------------

Zookeeper is deployed, so let’s continue with deploying Kafka. Navigate to $PROJECT_ROOT/kafka, and then apply the Kafka deployment configuration:

$ cd ../kafka
$ kubectl create -f target/classes/kubernetes.yml

service "kafka" created
replicationcontroller "kafka" created

Or with fabric8 maven plugin:

$ cd ../kafka
$ mvn fabric8:apply

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building kafka 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ kafka ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/kafka/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name kafka
[INFO] Created Service: kafka/target/fabric8/applyJson/ticket/service-kafka.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name kafka
[INFO] Created ReplicationController: kafka/target/fabric8/applyJson/ticket/replicationcontroller-kafka.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.563 s
[INFO] Finished at: 2016-05-19T16:03:25-07:00
[INFO] Final Memory: 26M/259M
[INFO] ------------------------------------------------------------------------

Use the kubectl get pod command to see what is running:

$ kubectl get pod

NAME                  READY     STATUS    RESTARTS   AGE
kafka-mqmxt           1/1       Running   0          46s
zk-standalone-4mo02   1/1       Running   0          4m

Did you notice that we didn’t manually "link" the containers as we started them? Kubernetes has a cluster service discovery feature called Kubernetes Services that load-balances against and lets us use internal DNS (or cluster IPs) to discover pods. For example, in the kubernetes.yml deployment configuration for Kafka, you’ll see the following:

    ...
    containers:
    - args: []
      command: []
      env:
      - name: "KAFKA_ADVERTISED_PORT"
        value: "9092"
      - name: "KAFKA_ADVERTISED_HOST_NAME"
        value: "kafka"
      - name: "KAFKA_ZOOKEEPER_CONNECT"
        value: "zookeeper:2181"
      - name: "KAFKA_PORT"
        value: "9092"
      - name: "KUBERNETES_NAMESPACE"
        valueFrom:
          fieldRef:
            fieldPath: "metadata.namespace"
      image: "docker.io/debezium/kafka:0.1"
      imagePullPolicy: "IfNotPresent"
      name: "kafka"
    ...

We’re specifying values for the KAFKA_ZOOKEEPER_CONNECT environment variable used by the Docker image, and thus enabling Kafka to discover Zookeeper pods wherever they are running. Although we could have used any hostname, to keep things simple we use just zookeeper for the DNS name. So, if you were to log in to one of the pods and try to reach the host named zookeeper, Kubernetes would transparently resolve that request to one of the Zookeeper pods (if there are multiple). Slick! This discovery mechanism is used for the rest of the components, too. (Note, this cluster IP that the DNS resolves to never changes for the life of the Kubernetes Service regardless of how many Pods exist for a given service. This means you can rely on this service discovery without all of the DNS caching issues you may otherwise run into.)

The next step is to create a schema-changes topic that Debezium’s MySQL connector will use. Let’s use the Kafka tools to create this:

$ KAFKA_POD_NAME=$(kubectl get pod | grep -i running | grep kafka | awk '{ print $1 }')

$ kubectl exec $KAFKA_POD_NAME --  /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic schema-changes.inventory

Start up a MySQL Database on Kubernetes

Starting the MySQL database follows the same instructions as installing Zookeeper or Kafka. We will navigate to the $PROJECT_ROOT/mysql56 directory, and we’ll use the MySQL 5.6 OpenShift Docker image so that it runs on both vanilla Kubernetes and OpenShift v3.x. Here’s the kubectl command to start up our MySQL instance:

$ cd ../mysql56
$ kubectl create -f target/classes/kubernetes.yml

service "mysql" created
replicationcontroller "mysql56" created

Or the equivalent Maven command:

$ cd mysql56
$ mvn fabric8:apply

Now, when we run kubectl get pod we should see our MySQL database running, too:

NAME                  READY     STATUS    RESTARTS   AGE
kafka-mqmxt           1/1       Running   0          17m
mysql56-b4f36         1/1       Running   0          9m
zk-standalone-4mo02   1/1       Running   0          21m

Let’s run a command to get client access to the database. First, set a few environment variables to the pod’s name and IP address:

$ MYSQL_POD_NAME=$(kubectl get pod | grep Running | grep ^mysql | awk '{ print $1 }')
$ MYSQL_POD_IP=$(kubectl describe pod $MYSQL_POD_NAME | grep IP | awk '{ print $2 }')

Then, log in to the Kubernetes pod that’s running the MySQL database, and start the MySQL command client:

$ kubectl exec -it $MYSQL_POD_NAME   -- /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin
Warning: Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 1
Server version: 5.6.26-log MySQL Community Server (GPL)

Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql>

This shows that the kubectl command line lets us easily get access to a pod or Docker container regardless of where it’s running in the cluster.

Next, exit out of the mysql shell (type exit) and run the following command to download a SQL script that populates an inventory sample database:

$ kubectl exec  -it $MYSQL_POD_NAME -- bash -c "curl -s -L https://gist.github.com/christian-posta/e20ddb5c945845b4b9f6eba94a98af09/raw | /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin"

Now, if we log back into the MySQL pod we can show the databases and tables:

$ kubectl exec -it $MYSQL_POD_NAME   -- /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin -e 'use inventory; show tables;'

+---------------------+
| Tables_in_inventory |
+---------------------+
| customers           |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
4 rows in set (0.00 sec)

Start Kafka Connect and Debezium

Navigate into the directory $PROJECT_ROOT/connect-mysql directory. Here, we’ll start a Kubernetes pod that runs Kafka Connect with the Debezium MySQL connector already installed. The Debezium MySQL connector connects to a MySQL database, reads the binlog, and writes those row events to Kafka. Start up Kafka Connect with Debezium on Kubernetes similarly to the previous components:

$ cd ../connect-mysql
$ kubectl create -f target/classes/kubernetes.yml

service "connect-mysql" created
replicationcontroller "connect-mysql" created

Or with the fabric8 maven plugin:

$ cd ../connect-mysql
$ mvn fabric8:apply
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building connect-mysql 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ connect-mysql ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/connect-mysql/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name connect-mysql
[INFO] Created Service: connect-mysql/target/fabric8/applyJson/ticket/service-connect-mysql.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name connect-mysql
[INFO] Created ReplicationController: connect-mysql/target/fabric8/applyJson/ticket/replicationcontroller-connect-mysql.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.255 s
[INFO] Finished at: 2016-05-25T09:21:04-07:00
[INFO] Final Memory: 27M/313M
[INFO] ------------------------------------------------------------------------

Just like in the Docker tutorial for Debezium, we now want to send a JSON object to the Kafka Connect API to start up our Debezium connector. First, we need to expose the API for the Kafka Connect cluster. You can do this however you want: on Kubernetes (Ingress definitions, NodePort services, etc) or on OpenShift you can use OpenShift Routes. For this simple example, we’ll use simple Pod port-forwarding to forward the connect-mysql pod’s 8083 port to our local machine (again, regardless of where the Pod is actually running the cluster. (This is such an incredible feature of Kubernetes that makes it so easy to develop distributed services!)

Let’s determine the pod name and then use port forwarding to our local machine:

$ CONNECT_POD_NAME=$(kubectl get pod | grep -i running | grep ^connect | awk '{ print $1 }')
$ kubectl port-forward $CONNECT_POD_NAME 8083:8083

I0525 09:30:08.390491    6651 portforward.go:213] Forwarding from 127.0.0.1:8083 -> 8083
I0525 09:30:08.390631    6651 portforward.go:213] Forwarding from [::1]:8083 -> 8083

We are forwarding the pod’s port 8083 to our local machine’s 8083. Now if we hit http://localhost:8083 it will be directed to the pod which runs our Kafka Connect and Debezium services.

Since it may be useful to see the output from the pod to see whether or not there are any exceptions, start another terminal and type the following to follow the Kafka Connect output:

$ CONNECT_POD_NAME=$(kubectl get pod | grep -i running | grep ^connect | awk '{ print $1 }')
$ kubectl logs -f $CONNECT_POD_NAME

Now, let’s use an HTTP client to post the Debezium Connector/Task to the endpoint we’ve just exposed locally:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "replicator", "database.password": "replpass", "database.server.id": "184054", "database.server.name": "mysql-server-1", "database.binlog": "mysql-bin.000001", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory" } }'

If we’re watching the log output for the connect-mysql pod, we’ll see it eventually end up looking something like this:

2016-05-27 18:50:14,580 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 2 : {mysql-server-1.inventory.products=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:14,690 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 3 : {mysql-server-1.inventory.products=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:14,911 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 7 : {mysql-server-1.inventory.products_on_hand=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:15,136 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 10 : {mysql-server-1.inventory.customers=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:15,362 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 13 : {mysql-server-1.inventory.orders=LEADER_NOT_AVAILABLE}

These error are just Kafka’s way of telling us the topics didn’t exist but were created.

If we now do a listing of our topics inside Kafka, we should see a Kafka topic for each table in the mysql inventory database:

$ kubectl exec  $KAFKA_POD_NAME --  /kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
my-connect-configs
my-connect-offsets
mysql-server-1.inventory.customers
mysql-server-1.inventory.orders
mysql-server-1.inventory.products
mysql-server-1.inventory.products_on_hand
schema-changes.inventory

Let’s take a look at what’s in one of these topics:

$ kubectl exec  $KAFKA_POD_NAME --  /kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --new-consumer --topic mysql-server-1.inventory.customers --from-beginning --property print.key=true
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1001}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1002}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1003}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1004}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}}

What happened? When we started Debezium’s MySQL connector, it started reading the binary replication log from the MySQL server, and it replayed all of the history and generated an event for each INSERT, UPDATE, and DELETE operation (though in our sample inventory database we only had INSERTs). If we or some client apps were to commit other changes to the database, Debezium would see those immediately and write those to the correct topic. In other words, Debezium records all of the changes to our MySQL database as events in Kafka topics! And from there, any tool, connector, or service can independnetly consume those event streams from Kafka and process them or put them into a different database, into Hadoop, elasticsearch, data grid, etc.

Cleanup

If you want to delete the connector, simply issue a REST request to remove it:

curl -i -X DELETE -H "Accept:application/json" http://localhost:8083/connectors/inventory-connector

back to top