Update (Oct. 11 2019): An alternative, and much simpler, approach for running Debezium (and Apache Kafka and Kafka Connect in general) on Kubernetes is to use a K8s operator such as Strimzi. You can find instructions for the set-up of Debezium on OpenShift here, and similar steps apply for plain Kubernetes.

Our Debezium Tutorial walks you step by step through using Debezium by installing, starting, and linking together all of the Docker containers running on a single host machine. Of course, you can use things like Docker Compose or your own scripts to make this easier, although that would just automating running all the containers on a single machine. What you really want is to run the containers on a cluster of machines. In this blog, we’ll run Debezium using a container cluster manager from Red Hat and Google called Kubernetes.

Kubernetes is a container (Docker/Rocket/Hyper.sh) cluster management tool. Like many other popular cluster management and compute resource scheduling platforms, Kubernetes' roots are in Google, who is no stranger to running containers at scale. They start, stop, and cluster 2 billion containers per week and they contributed a lot of the Linux kernel underpinnings that make containers possible. One of their famous papers talks about an internal cluster manager named Borg. With Kubernetes, Google got tired of everyone implementing their papers in Java so they decided to implement this one themselves :)

Kubernetes is written in Go-lang and is quickly becoming the de-facto API for scheduling, managing, and clustering containers at scale. This blog isn’t intended to be a primer on Kubernetes, so we recommend heading over to the Getting Started docs to learn more about Kubernetes.

Getting started

To get started, we need to have access to a Kubernetes cluster. Getting one started is pretty easy: just follow the getting started guides. A favorite of ours is OpenShift’s all in one VM or the Red Hat Container Development Kit which provide a hardened, production-ready distribution of Kubernetes. Once you’ve installed it and logged in, you should be able to run kubectl get pod to get a list of Kubernetes pods you may have running. You don’t need anything running else inside Kubernetes to get started.

To get and build the Kubernetes manifest files (yaml descriptors), go clone the Debezium Kubernetes repo and run the following command:

$ mvn clean
$ mvn install

This project uses the awesome Fabric8 Maven plugin to automatically generate the Kubernetes manifest files. Here’s an example of what gets generated in $PROJECT_ROOT/zk-standalone/target/classes/kubernetes.yml

apiVersion: "v1"
items:
- apiVersion: "v1"
  kind: "Service"
  metadata:
    annotations: {}
    labels:
      project: "zookeeper"
      provider: "debezium"
      version: "0.1-SNAPSHOT"
      group: "io.debezium"
    name: "zookeeper"
  spec:
    deprecatedPublicIPs: []
    externalIPs: []
    ports:
    - port: 2181
      protocol: "TCP"
      targetPort: 2181
    selector:
      project: "zookeeper"
      provider: "debezium"
      group: "io.debezium"
- apiVersion: "v1"
  kind: "ReplicationController"
  metadata:
    annotations:
      fabric8.io/git-branch: "master"
      fabric8.io/git-commit: "004e222462749fbaf12c3ee33edca9b077ee9003"
    labels:
      project: "zookeeper"
      provider: "debezium"
      version: "0.1-SNAPSHOT"
      group: "io.debezium"
    name: "zk-standalone"
  spec:
    replicas: 1
    selector:
      project: "zookeeper"
      provider: "debezium"
      version: "0.1-SNAPSHOT"
      group: "io.debezium"
    template:
      metadata:
        annotations: {}
        labels:
          project: "zookeeper"
          provider: "debezium"
          version: "0.1-SNAPSHOT"
          group: "io.debezium"
      spec:
        containers:
        - args: []
          command: []
          env:
          - name: "KUBERNETES_NAMESPACE"
            valueFrom:
              fieldRef:
                fieldPath: "metadata.namespace"
          image: "docker.io/debezium/zookeeper:0.1"
          imagePullPolicy: "IfNotPresent"
          name: "zk-standalone"
          ports:
          - containerPort: 3888
            name: "election"
          - containerPort: 2888
            name: "peer"
          - containerPort: 2181
            name: "client"
          securityContext: {}
          volumeMounts: []
        imagePullSecrets: []
        nodeSelector: {}
        volumes: []
kind: "List"

Starting Zookeeper and Kafka on Kubernetes

To start Apache Zookeeper or Apache Kafka inside Kubernetes you have two options. If you have the kubectl command line (or the oc tool from the OpenShift client distros) on your machine you can apply any of the newly generated Kubernetes manifest files like this:

$ kubectl create -f <path_to_file>

Or you can use the Fabric8 Maven plugin and its fabric8:apply goal to apply the manifest files. Note for either of these two options to work, you must be currently logged into your Kubernetes cluster. (Also, OpenShift’s oc login <url> makes this super easy, or see Logging into a Kubernetes Cluster with kubectl for more information.)

First, let’s deploy Zookeeper to our Kubernetes cluster. We need to be in $PROJECT_ROOT/zk-standalone directory, and then we’ll apply our Kubernetes configuration. First, let’s see how to do this with the kubectl command:

$ cd zk-standalone
$ kubectl create -f target/classes/kubernetes.yml

service "zookeeper" created
replicationcontroller "zk-standalone" created

You can do the same thing with Maven and the fabric8 maven plugin:

$ cd zk-standalone
$ mvn fabric8:apply

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building zk-standalone 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ zk-standalone ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/zk-standalone/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name zookeeper
[INFO] Created Service: zk-standalone/target/fabric8/applyJson/ticket/service-zookeeper.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name zk-standalone
[INFO] Created ReplicationController: zk-standalone/target/fabric8/applyJson/ticket/replicationcontroller-zk-standalone.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.661 s
[INFO] Finished at: 2016-05-19T15:59:26-07:00
[INFO] Final Memory: 26M/260M
[INFO] ------------------------------------------------------------------------

Zookeeper is deployed, so let’s continue with deploying Kafka. Navigate to $PROJECT_ROOT/kafka, and then apply the Kafka deployment configuration:

$ cd ../kafka
$ kubectl create -f target/classes/kubernetes.yml

service "kafka" created
replicationcontroller "kafka" created

Or with fabric8 maven plugin:

$ cd ../kafka
$ mvn fabric8:apply

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building kafka 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ kafka ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/kafka/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name kafka
[INFO] Created Service: kafka/target/fabric8/applyJson/ticket/service-kafka.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name kafka
[INFO] Created ReplicationController: kafka/target/fabric8/applyJson/ticket/replicationcontroller-kafka.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.563 s
[INFO] Finished at: 2016-05-19T16:03:25-07:00
[INFO] Final Memory: 26M/259M
[INFO] ------------------------------------------------------------------------

Use the kubectl get pod command to see what is running:

$ kubectl get pod

NAME                  READY     STATUS    RESTARTS   AGE
kafka-mqmxt           1/1       Running   0          46s
zk-standalone-4mo02   1/1       Running   0          4m

Did you notice that we didn’t manually "link" the containers as we started them? Kubernetes has a cluster service discovery feature called Kubernetes Services that load-balances against and lets us use internal DNS (or cluster IPs) to discover pods. For example, in the kubernetes.yml deployment configuration for Kafka, you’ll see the following:

    ...
    containers:
    - args: []
      command: []
      env:
      - name: "KAFKA_ADVERTISED_PORT"
        value: "9092"
      - name: "KAFKA_ADVERTISED_HOST_NAME"
        value: "kafka"
      - name: "KAFKA_ZOOKEEPER_CONNECT"
        value: "zookeeper:2181"
      - name: "KAFKA_PORT"
        value: "9092"
      - name: "KUBERNETES_NAMESPACE"
        valueFrom:
          fieldRef:
            fieldPath: "metadata.namespace"
      image: "docker.io/debezium/kafka:0.1"
      imagePullPolicy: "IfNotPresent"
      name: "kafka"
    ...

We’re specifying values for the KAFKA_ZOOKEEPER_CONNECT environment variable used by the Docker image, and thus enabling Kafka to discover Zookeeper pods wherever they are running. Although we could have used any hostname, to keep things simple we use just zookeeper for the DNS name. So, if you were to log in to one of the pods and try to reach the host named zookeeper, Kubernetes would transparently resolve that request to one of the Zookeeper pods (if there are multiple). Slick! This discovery mechanism is used for the rest of the components, too. (Note, this cluster IP that the DNS resolves to never changes for the life of the Kubernetes Service regardless of how many Pods exist for a given service. This means you can rely on this service discovery without all of the DNS caching issues you may otherwise run into.)

The next step is to create a schema-changes topic that Debezium’s MySQL connector will use. Let’s use the Kafka tools to create this:

$ KAFKA_POD_NAME=$(kubectl get pod | grep -i running | grep kafka | awk '{ print $1 }')

$ kubectl exec $KAFKA_POD_NAME --  /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic schema-changes.inventory

Start up a MySQL Database on Kubernetes

Starting the MySQL database follows the same instructions as installing Zookeeper or Kafka. We will navigate to the $PROJECT_ROOT/mysql56 directory, and we’ll use the MySQL 5.6 OpenShift Docker image so that it runs on both vanilla Kubernetes and OpenShift v3.x. Here’s the kubectl command to start up our MySQL instance:

$ cd ../mysql56
$ kubectl create -f target/classes/kubernetes.yml

service "mysql" created
replicationcontroller "mysql56" created

Or the equivalent Maven command:

$ cd mysql56
$ mvn fabric8:apply

Now, when we run kubectl get pod we should see our MySQL database running, too:

NAME                  READY     STATUS    RESTARTS   AGE
kafka-mqmxt           1/1       Running   0          17m
mysql56-b4f36         1/1       Running   0          9m
zk-standalone-4mo02   1/1       Running   0          21m

Let’s run a command to get client access to the database. First, set a few environment variables to the pod’s name and IP address:

$ MYSQL_POD_NAME=$(kubectl get pod | grep Running | grep ^mysql | awk '{ print $1 }')
$ MYSQL_POD_IP=$(kubectl describe pod $MYSQL_POD_NAME | grep IP | awk '{ print $2 }')

Then, log in to the Kubernetes pod that’s running the MySQL database, and start the MySQL command client:

$ kubectl exec -it $MYSQL_POD_NAME   -- /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin
Warning: Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 1
Server version: 5.6.26-log MySQL Community Server (GPL)

Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql>

This shows that the kubectl command line lets us easily get access to a pod or Docker container regardless of where it’s running in the cluster.

Next, exit out of the mysql shell (type exit) and run the following command to download a SQL script that populates an inventory sample database:

$ kubectl exec  -it $MYSQL_POD_NAME -- bash -c "curl -s -L https://gist.github.com/christian-posta/e20ddb5c945845b4b9f6eba94a98af09/raw | /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin"

Now, if we log back into the MySQL pod we can show the databases and tables:

$ kubectl exec -it $MYSQL_POD_NAME   -- /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin -e 'use inventory; show tables;'

+---------------------+
| Tables_in_inventory |
+---------------------+
| customers           |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
4 rows in set (0.00 sec)

Start Kafka Connect and Debezium

Navigate into the directory $PROJECT_ROOT/connect-mysql directory. Here, we’ll start a Kubernetes pod that runs Kafka Connect with the Debezium MySQL connector already installed. The Debezium MySQL connector connects to a MySQL database, reads the binlog, and writes those row events to Kafka. Start up Kafka Connect with Debezium on Kubernetes similarly to the previous components:

$ cd ../connect-mysql
$ kubectl create -f target/classes/kubernetes.yml

service "connect-mysql" created
replicationcontroller "connect-mysql" created

Or with the fabric8 maven plugin:

$ cd ../connect-mysql
$ mvn fabric8:apply
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building connect-mysql 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ connect-mysql ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/connect-mysql/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name connect-mysql
[INFO] Created Service: connect-mysql/target/fabric8/applyJson/ticket/service-connect-mysql.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name connect-mysql
[INFO] Created ReplicationController: connect-mysql/target/fabric8/applyJson/ticket/replicationcontroller-connect-mysql.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.255 s
[INFO] Finished at: 2016-05-25T09:21:04-07:00
[INFO] Final Memory: 27M/313M
[INFO] ------------------------------------------------------------------------

Just like in the Docker tutorial for Debezium, we now want to send a JSON object to the Kafka Connect API to start up our Debezium connector. First, we need to expose the API for the Kafka Connect cluster. You can do this however you want: on Kubernetes (Ingress definitions, NodePort services, etc) or on OpenShift you can use OpenShift Routes. For this simple example, we’ll use simple Pod port-forwarding to forward the connect-mysql pod’s 8083 port to our local machine (again, regardless of where the Pod is actually running the cluster. (This is such an incredible feature of Kubernetes that makes it so easy to develop distributed services!)

Let’s determine the pod name and then use port forwarding to our local machine:

$ CONNECT_POD_NAME=$(kubectl get pod | grep -i running | grep ^connect | awk '{ print $1 }')
$ kubectl port-forward $CONNECT_POD_NAME 8083:8083

I0525 09:30:08.390491    6651 portforward.go:213] Forwarding from 127.0.0.1:8083 -> 8083
I0525 09:30:08.390631    6651 portforward.go:213] Forwarding from [::1]:8083 -> 8083

We are forwarding the pod’s port 8083 to our local machine’s 8083. Now if we hit http://localhost:8083 it will be directed to the pod which runs our Kafka Connect and Debezium services.

Since it may be useful to see the output from the pod to see whether or not there are any exceptions, start another terminal and type the following to follow the Kafka Connect output:

$ CONNECT_POD_NAME=$(kubectl get pod | grep -i running | grep ^connect | awk '{ print $1 }')
$ kubectl logs -f $CONNECT_POD_NAME

Now, let’s use an HTTP client to post the Debezium Connector/Task to the endpoint we’ve just exposed locally:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "replicator", "database.password": "replpass", "database.server.id": "184054", "database.server.name": "mysql-server-1", "database.binlog": "mysql-bin.000001", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory" } }'

If we’re watching the log output for the connect-mysql pod, we’ll see it eventually end up looking something like this:

2016-05-27 18:50:14,580 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 2 : {mysql-server-1.inventory.products=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:14,690 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 3 : {mysql-server-1.inventory.products=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:14,911 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 7 : {mysql-server-1.inventory.products_on_hand=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:15,136 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 10 : {mysql-server-1.inventory.customers=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:15,362 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 13 : {mysql-server-1.inventory.orders=LEADER_NOT_AVAILABLE}

These error are just Kafka’s way of telling us the topics didn’t exist but were created.

If we now do a listing of our topics inside Kafka, we should see a Kafka topic for each table in the mysql inventory database:

$ kubectl exec  $KAFKA_POD_NAME --  /kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
my-connect-configs
my-connect-offsets
mysql-server-1.inventory.customers
mysql-server-1.inventory.orders
mysql-server-1.inventory.products
mysql-server-1.inventory.products_on_hand
schema-changes.inventory

Let’s take a look at what’s in one of these topics:

$ kubectl exec  $KAFKA_POD_NAME --  /kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --new-consumer --topic mysql-server-1.inventory.customers --from-beginning --property print.key=true
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1001}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1002}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1003}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1004}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}}

What happened? When we started Debezium’s MySQL connector, it started reading the binary replication log from the MySQL server, and it replayed all of the history and generated an event for each INSERT, UPDATE, and DELETE operation (though in our sample inventory database we only had INSERTs). If we or some client apps were to commit other changes to the database, Debezium would see those immediately and write those to the correct topic. In other words, Debezium records all of the changes to our MySQL database as events in Kafka topics! And from there, any tool, connector, or service can independnetly consume those event streams from Kafka and process them or put them into a different database, into Hadoop, elasticsearch, data grid, etc.

Cleanup

If you want to delete the connector, simply issue a REST request to remove it:

curl -i -X DELETE -H "Accept:application/json" http://localhost:8083/connectors/inventory-connector

Christian Posta

Christian is a Principal Middleware Architect at Red Hat, and an enthusiast of open-source software, Apache, Cloud, Integration, Kubernetes, Docker, OpenShift, and Fabric8.

   


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.