Subscribe


Debezium on Kubernetes

Our Debezium Tutorial walks you step by step through using Debezium by installing, starting, and linking together all of the Docker containers running on a single host machine. Of course, you can use things like Docker Compose or your own scripts to make this easier, although that would just automating running all the containers on a single machine. What you really want is to run the containers on a cluster of machines. In this blog, we’ll run Debezium using a container cluster manager from Red Hat and Google called Kubernetes.

Kubernetes is a container (Docker/Rocket/Hyper.sh) cluster management tool. Like many other popular cluster management and compute resource scheduling platforms, Kubernetes' roots are in Google, who is no stranger to running containers at scale. They start, stop, and cluster 2 billion containers per week and they contributed a lot of the Linux kernel underpinnings that make containers possible. One of their famous papers talks about an internal cluster manager named Borg. With Kubernetes, Google got tired of everyone implementing their papers in Java so they decided to implement this one themselves :)

Kubernetes is written in Go-lang and is quickly becoming the de-facto API for scheduling, managing, and clustering containers at scale. This blog isn’t intended to be a primer on Kubernetes, so we recommend heading over to the Getting Started docs to learn more about Kubernetes.

Getting started

To get started, we need to have access to a Kubernetes cluster. Getting one started is pretty easy: just follow the getting started guides. A favorite of ours is OpenShift’s all in one VM or the Red Hat Container Development Kit which provide a hardened, production-ready distribution of Kubernetes. Once you’ve installed it and logged in, you should be able to run kubectl get pod to get a list of Kubernetes pods you may have running. You don’t need anything running else inside Kubernetes to get started.

To get and build the Kubernetes manifest files (yaml descriptors), go clone the Debezium Kubernetes repo and run the following command:

$ mvn clean
$ mvn install

This project uses the awesome Fabric8 Maven plugin to automatically generate the Kubernetes manifest files. Here’s an example of what gets generated in $PROJECT_ROOT/zk-standalone/target/classes/kubernetes.yml

apiVersion: "v1"
items:
- apiVersion: "v1"
  kind: "Service"
  metadata:
    annotations: {}
    labels:
      project: "zookeeper"
      provider: "debezium"
      version: "0.1-SNAPSHOT"
      group: "io.debezium"
    name: "zookeeper"
  spec:
    deprecatedPublicIPs: []
    externalIPs: []
    ports:
    - port: 2181
      protocol: "TCP"
      targetPort: 2181
    selector:
      project: "zookeeper"
      provider: "debezium"
      group: "io.debezium"
- apiVersion: "v1"
  kind: "ReplicationController"
  metadata:
    annotations:
      fabric8.io/git-branch: "master"
      fabric8.io/git-commit: "004e222462749fbaf12c3ee33edca9b077ee9003"
    labels:
      project: "zookeeper"
      provider: "debezium"
      version: "0.1-SNAPSHOT"
      group: "io.debezium"
    name: "zk-standalone"
  spec:
    replicas: 1
    selector:
      project: "zookeeper"
      provider: "debezium"
      version: "0.1-SNAPSHOT"
      group: "io.debezium"
    template:
      metadata:
        annotations: {}
        labels:
          project: "zookeeper"
          provider: "debezium"
          version: "0.1-SNAPSHOT"
          group: "io.debezium"
      spec:
        containers:
        - args: []
          command: []
          env:
          - name: "KUBERNETES_NAMESPACE"
            valueFrom:
              fieldRef:
                fieldPath: "metadata.namespace"
          image: "docker.io/debezium/zookeeper:0.1"
          imagePullPolicy: "IfNotPresent"
          name: "zk-standalone"
          ports:
          - containerPort: 3888
            name: "election"
          - containerPort: 2888
            name: "peer"
          - containerPort: 2181
            name: "client"
          securityContext: {}
          volumeMounts: []
        imagePullSecrets: []
        nodeSelector: {}
        volumes: []
kind: "List"

Starting Zookeeper and Kafka on Kubernetes

To start Apache Zookeeper or Apache Kafka inside Kubernetes you have two options. If you have the kubectl command line (or the oc tool from the OpenShift client distros) on your machine you can apply any of the newly generated Kubernetes manifest files like this:

$ kubectl create -f <path_to_file>

Or you can use the Fabric8 Maven plugin and its fabric8:apply goal to apply the manifest files. Note for either of these two options to work, you must be currently logged into your Kubernetes cluster. (Also, OpenShift’s oc login <url> makes this super easy, or see Logging into a Kubernetes Cluster with kubectl for more information.)

First, let’s deploy Zookeeper to our Kubernetes cluster. We need to be in $PROJECT_ROOT/zk-standalone directory, and then we’ll apply our Kubernetes configuration. First, let’s see how to do this with the kubectl command:

$ cd zk-standalone
$ kubectl create -f target/classes/kubernetes.yml

service "zookeeper" created
replicationcontroller "zk-standalone" created

You can do the same thing with Maven and the fabric8 maven plugin:

$ cd zk-standalone
$ mvn fabric8:apply

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building zk-standalone 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ zk-standalone ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/zk-standalone/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name zookeeper
[INFO] Created Service: zk-standalone/target/fabric8/applyJson/ticket/service-zookeeper.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name zk-standalone
[INFO] Created ReplicationController: zk-standalone/target/fabric8/applyJson/ticket/replicationcontroller-zk-standalone.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.661 s
[INFO] Finished at: 2016-05-19T15:59:26-07:00
[INFO] Final Memory: 26M/260M
[INFO] ------------------------------------------------------------------------

Zookeeper is deployed, so let’s continue with deploying Kafka. Navigate to $PROJECT_ROOT/kafka, and then apply the Kafka deployment configuration:

$ cd ../kafka
$ kubectl create -f target/classes/kubernetes.yml

service "kafka" created
replicationcontroller "kafka" created

Or with fabric8 maven plugin:

$ cd ../kafka
$ mvn fabric8:apply

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building kafka 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ kafka ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/kafka/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name kafka
[INFO] Created Service: kafka/target/fabric8/applyJson/ticket/service-kafka.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name kafka
[INFO] Created ReplicationController: kafka/target/fabric8/applyJson/ticket/replicationcontroller-kafka.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.563 s
[INFO] Finished at: 2016-05-19T16:03:25-07:00
[INFO] Final Memory: 26M/259M
[INFO] ------------------------------------------------------------------------

Use the kubectl get pod command to see what is running:

$ kubectl get pod

NAME                  READY     STATUS    RESTARTS   AGE
kafka-mqmxt           1/1       Running   0          46s
zk-standalone-4mo02   1/1       Running   0          4m

Did you notice that we didn’t manually "link" the containers as we started them? Kubernetes has a cluster service discovery feature called Kubernetes Services that load-balances against and lets us use internal DNS (or cluster IPs) to discover pods. For example, in the kubernetes.yml deployment configuration for Kafka, you’ll see the following:

    ...
    containers:
    - args: []
      command: []
      env:
      - name: "KAFKA_ADVERTISED_PORT"
        value: "9092"
      - name: "KAFKA_ADVERTISED_HOST_NAME"
        value: "kafka"
      - name: "KAFKA_ZOOKEEPER_CONNECT"
        value: "zookeeper:2181"
      - name: "KAFKA_PORT"
        value: "9092"
      - name: "KUBERNETES_NAMESPACE"
        valueFrom:
          fieldRef:
            fieldPath: "metadata.namespace"
      image: "docker.io/debezium/kafka:0.1"
      imagePullPolicy: "IfNotPresent"
      name: "kafka"
    ...

We’re specifying values for the KAFKA_ZOOKEEPER_CONNECT environment variable used by the Docker image, and thus enabling Kafka to discover Zookeeper pods wherever they are running. Although we could have used any hostname, to keep things simple we use just zookeeper for the DNS name. So, if you were to log in to one of the pods and try to reach the host named zookeeper, Kubernetes would transparently resolve that request to one of the Zookeeper pods (if there are multiple). Slick! This discovery mechanism is used for the rest of the components, too. (Note, this cluster IP that the DNS resolves to never changes for the life of the Kubernetes Service regardless of how many Pods exist for a given service. This means you can rely on this service discovery without all of the DNS caching issues you may otherwise run into.)

The next step is to create a schema-changes topic that Debezium’s MySQL connector will use. Let’s use the Kafka tools to create this:

$ KAFKA_POD_NAME=$(kubectl get pod | grep -i running | grep kafka | awk '{ print $1 }')

$ kubectl exec $KAFKA_POD_NAME --  /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic schema-changes.inventory

Start up a MySQL Database on Kubernetes

Starting the MySQL database follows the same instructions as installing Zookeeper or Kafka. We will navigate to the $PROJECT_ROOT/mysql56 directory, and we’ll use the MySQL 5.6 OpenShift Docker image so that it runs on both vanilla Kubernetes and OpenShift v3.x. Here’s the kubectl command to start up our MySQL instance:

$ cd ../mysql56
$ kubectl create -f target/classes/kubernetes.yml

service "mysql" created
replicationcontroller "mysql56" created

Or the equivalent Maven command:

$ cd mysql56
$ mvn fabric8:apply

Now, when we run kubectl get pod we should see our MySQL database running, too:

NAME                  READY     STATUS    RESTARTS   AGE
kafka-mqmxt           1/1       Running   0          17m
mysql56-b4f36         1/1       Running   0          9m
zk-standalone-4mo02   1/1       Running   0          21m

Let’s run a command to get client access to the database. First, set a few environment variables to the pod’s name and IP address:

$ MYSQL_POD_NAME=$(kubectl get pod | grep Running | grep ^mysql | awk '{ print $1 }')
$ MYSQL_POD_IP=$(kubectl describe pod $MYSQL_POD_NAME | grep IP | awk '{ print $2 }')

Then, log in to the Kubernetes pod that’s running the MySQL database, and start the MySQL command client:

$ kubectl exec -it $MYSQL_POD_NAME   -- /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin
Warning: Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 1
Server version: 5.6.26-log MySQL Community Server (GPL)

Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql>

This shows that the kubectl command line lets us easily get access to a pod or Docker container regardless of where it’s running in the cluster.

Next, exit out of the mysql shell (type exit) and run the following command to download a SQL script that populates an inventory sample database:

$ kubectl exec  -it $MYSQL_POD_NAME -- bash -c "curl -s -L https://gist.github.com/christian-posta/e20ddb5c945845b4b9f6eba94a98af09/raw | /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin"

Now, if we log back into the MySQL pod we can show the databases and tables:

$ kubectl exec -it $MYSQL_POD_NAME   -- /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin -e 'use inventory; show tables;'

+---------------------+
| Tables_in_inventory |
+---------------------+
| customers           |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
4 rows in set (0.00 sec)

Start Kafka Connect and Debezium

Navigate into the directory $PROJECT_ROOT/connect-mysql directory. Here, we’ll start a Kubernetes pod that runs Kafka Connect with the Debezium MySQL connector already installed. The Debezium MySQL connector connects to a MySQL database, reads the binlog, and writes those row events to Kafka. Start up Kafka Connect with Debezium on Kubernetes similarly to the previous components:

$ cd ../connect-mysql
$ kubectl create -f target/classes/kubernetes.yml

service "connect-mysql" created
replicationcontroller "connect-mysql" created

Or with the fabric8 maven plugin:

$ cd ../connect-mysql
$ mvn fabric8:apply
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building connect-mysql 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ connect-mysql ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/connect-mysql/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name connect-mysql
[INFO] Created Service: connect-mysql/target/fabric8/applyJson/ticket/service-connect-mysql.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name connect-mysql
[INFO] Created ReplicationController: connect-mysql/target/fabric8/applyJson/ticket/replicationcontroller-connect-mysql.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.255 s
[INFO] Finished at: 2016-05-25T09:21:04-07:00
[INFO] Final Memory: 27M/313M
[INFO] ------------------------------------------------------------------------

Just like in the Docker tutorial for Debezium, we now want to send a JSON object to the Kafka Connect API to start up our Debezium connector. First, we need to expose the API for the Kafka Connect cluster. You can do this however you want: on Kubernetes (Ingress definitions, NodePort services, etc) or on OpenShift you can use OpenShift Routes. For this simple example, we’ll use simple Pod port-forwarding to forward the connect-mysql pod’s 8083 port to our local machine (again, regardless of where the Pod is actually running the cluster. (This is such an incredible feature of Kubernetes that makes it so easy to develop distributed services!)

Let’s determine the pod name and then use port forwarding to our local machine:

$ CONNECT_POD_NAME=$(kubectl get pod | grep -i running | grep ^connect | awk '{ print $1 }')
$ kubectl port-forward $CONNECT_POD_NAME 8083:8083

I0525 09:30:08.390491    6651 portforward.go:213] Forwarding from 127.0.0.1:8083 -> 8083
I0525 09:30:08.390631    6651 portforward.go:213] Forwarding from [::1]:8083 -> 8083

We are forwarding the pod’s port 8083 to our local machine’s 8083. Now if we hit http://localhost:8083 it will be directed to the pod which runs our Kafka Connect and Debezium services.

Since it may be useful to see the output from the pod to see whether or not there are any exceptions, start another terminal and type the following to follow the Kafka Connect output:

$ CONNECT_POD_NAME=$(kubectl get pod | grep -i running | grep ^connect | awk '{ print $1 }')
$ kubectl logs -f $CONNECT_POD_NAME

Now, let’s use an HTTP client to post the Debezium Connector/Task to the endpoint we’ve just exposed locally:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "replicator", "database.password": "replpass", "database.server.id": "184054", "database.server.name": "mysql-server-1", "database.binlog": "mysql-bin.000001", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory" } }'

If we’re watching the log output for the connect-mysql pod, we’ll see it eventually end up looking something like this:

2016-05-27 18:50:14,580 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 2 : {mysql-server-1.inventory.products=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:14,690 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 3 : {mysql-server-1.inventory.products=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:14,911 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 7 : {mysql-server-1.inventory.products_on_hand=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:15,136 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 10 : {mysql-server-1.inventory.customers=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:15,362 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 13 : {mysql-server-1.inventory.orders=LEADER_NOT_AVAILABLE}

These error are just Kafka’s way of telling us the topics didn’t exist but were created.

If we now do a listing of our topics inside Kafka, we should see a Kafka topic for each table in the mysql inventory database:

$ kubectl exec  $KAFKA_POD_NAME --  /kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
my-connect-configs
my-connect-offsets
mysql-server-1.inventory.customers
mysql-server-1.inventory.orders
mysql-server-1.inventory.products
mysql-server-1.inventory.products_on_hand
schema-changes.inventory

Let’s take a look at what’s in one of these topics:

$ kubectl exec  $KAFKA_POD_NAME --  /kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --new-consumer --topic mysql-server-1.inventory.customers --from-beginning --property print.key=true
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1001}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1002}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1003}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1004}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}}

What happened? When we started Debezium’s MySQL connector, it started reading the binary replication log from the MySQL server, and it replayed all of the history and generated an event for each INSERT, UPDATE, and DELETE operation (though in our sample inventory database we only had INSERTs). If we or some client apps were to commit other changes to the database, Debezium would see those immediately and write those to the correct topic. In other words, Debezium records all of the changes to our MySQL database as events in Kafka topics! And from there, any tool, connector, or service can independnetly consume those event streams from Kafka and process them or put them into a different database, into Hadoop, elasticsearch, data grid, etc.

Cleanup

If you want to delete the connector, simply issue a REST request to remove it:

curl -i -X DELETE -H "Accept:application/json" http://localhost:8083/connectors/inventory-connector

Parsing DDL

When our MySQL connector is reading the binlog of a MySQL server or cluster, it parses the DDL statements in the log and builds an in-memory model of each table’s schema as it evolves over time. This process is important because the connector generates events for each table using the definition of the table at the time of each event. We can’t use the database’s current schema, since it may have changed since the point in time (or position in the log) where the connector is reading.

Parsing DDL of MySQL or any other major relational database can seem to be a daunting task. Usually each DBMS has a highly-customized SQL grammar, and although the data manipulation language (DML) statements are often fairly close the standards, the data definition language (DDL) statements are usually less so and involve more DBMS-specific features.

So given this, why did we write our own DDL parser for MySQL? Let’s first look at what Debezium needs a DDL parser to do.

Parsing DDL in the Debezium MySQL connector

The MySQL binlog contains various kinds of events. For example, when a row is inserted into a table, the binlog event contains an indirect reference to the table and the values for each column in the table, but there is no information about the columns that make up the table. The only thing in the binlog referencing table structures are SQL DDL statements that were generated by MySQL when it processed user-supplied DDL statements.

The connector also produces messages using Kafka Connect Schemas, which are simple data structures that define the various names and types of each field, and the way the fields are organized. So, when we generate an event message for the table insert, we first have to have a Kafka Connect Schema object with all the appropriate fields, and then we have to convert the ordered array of column values into a Kafka Connect Struct object using the fields and the individual column values in the table insert event.

Luckily, when we come across a DDL statement we can update our in-memory model and then use this to generate a Schema object. At the same time, we can create a component that will use this Schema object to create a Struct object from the ordered array of column values that appear in the events. All of this can be done once and used for all row events on that table, until we come across another DDL statement that changes the table’s schema at which point we updated our model again.

So all of this requires parsing all of the DDL statements, though for our purposes we only have to understand a small subset of the DDL grammer. We then have to use that subset of statements to update our in-memory model of our tables. And since our in-memory table model is not specific to MySQL, the rest of the functionality to generate Schema objects and components that convert an array of values into Struct objects used in messages is all generic.

Existing DDL libraries

Unfortunately, there aren’t really that many 3rd party open source libraries for parsing DDL statements for MySQL, PostgreSQL, or other popular RDBMSes. JSqlParser is often cited, but it has a single grammar that is a combination of multiple DBMS grammars and therefore is not a strict parser for any specific DBMS. Adding support for other DBMSes by updating the composite grammar would likely be difficult.

Other libraries, such as PrestoDB, define their own SQL grammar and are unable to handle the intracacies and nuances of the MySQL DDL grammar. The Antlr parser generator project has a grammar for MySQL 5.6, but this is limited to a small subset of DML and has no support for DDL or newer 5.7 features. There are older SQL-related grammars for Antlr 3, but these are often massive, suffer from bugs, and limited to specific DBMSes. The Teiid project is a data virtualization engine that sits atop a wide variety of DBMSes and data sources, and it’s tooling has a series of DDL parsers that construct ASTs in a special repository (the author actually helped develop these). There are also Ruby libraries, like Square’s MySQL Parser library. There is also a proprietary commercial product.

Our DDL parser framework

Since we couldn’t find a useful 3rd party open source library, we chose to create our own DDL parser framework limited to our needs:

  • Parse DDL statements and update our in-memory model.

  • Focus on consuming those essential statements (e.g., create, alter, and drop tables and views), while completely ignoring other statements without having to parse them.

  • Structure the parser code similarly to the MySQL DDL grammar documentation and use method names that mirror the rules in the grammar. This will make it easier to maintain over time.

  • Allow creation of parsers for PostgreSQL, Oracle, SQLServer, and other DBMSes as needed.

  • Support customization through subclassing: be able to easily override narrow portions of the logic without having to copy lots of code.

  • Make it easy to develop, debug, and test parsers.

The resulting framework includes a tokenizer that converts one or more DDL statements in a string into a rewindable sequence of tokens, where each token represents punctuation, quoted strings, case-insentivie words and symbols, numbers, keywords, comments, and terminating characters (such as ; for MySQL). The DDL parser, then, walks the token stream looking for patterns using a simple and easy to read fluent API, calling methods on itself to process the various sets of tokens. The parser also uses an internal data type parser for processing SQL data type expressions, such as INT, VARCHAR(64), NUMERIC(32,3), TIMESTAMP(8) WITH TIME ZONE.

The MySqlDdlParser class extends a base class and provides all of the MySQL-specific parsing logic. For example, the DDL statements:

# Create and populate our products using a single insert with many rows
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512),
  weight FLOAT
);
ALTER TABLE products AUTO_INCREMENT = 101;

# Create and populate the products on hand using multiple inserts
CREATE TABLE products_on_hand (
  product_id INTEGER NOT NULL PRIMARY KEY,
  quantity INTEGER NOT NULL,
  FOREIGN KEY (product_id) REFERENCES products(id)
);

can be easily parsed with:

String ddlStatements = ...
DdlParser parser = new MySqlDdlParser();
Tables tables = new Tables();
parser.parse(ddl, tables);

Here, the Tables object is our in-memory representation of our named table definitions. The parser processes the DDL statements, applying each to the appropriate table definition inside the Tables object.

How it works

Each DdlParser implementation has the following public method that will parse the statements in the supplied String:

    public final void parse(String ddlContent, Tables databaseTables) {
        Tokenizer tokenizer = new DdlTokenizer(!skipComments(), this::determineTokenType);
        TokenStream stream = new TokenStream(ddlContent, tokenizer, false);
        stream.start();
        parse(stream, databaseTables);
    }

Here, the method creates a new TokenStream from the content using a DdlTokenizer that knows how to separate the characters in the string into the various typed token objects. It then calls another parse method that does the bulk of the work:

    public final void parse(TokenStream ddlContent, Tables databaseTables)
                           throws ParsingException, IllegalStateException {
        this.tokens = ddlContent;
        this.databaseTables = databaseTables;
        Marker marker = ddlContent.mark();
        try {
            while (ddlContent.hasNext()) {
                parseNextStatement(ddlContent.mark());
                // Consume the statement terminator if it is still there ...
                tokens.canConsume(DdlTokenizer.STATEMENT_TERMINATOR);
            }
        } catch (ParsingException e) {
            ddlContent.rewind(marker);
            throw e;
        } catch (Throwable t) {
            parsingFailed(ddlContent.nextPosition(),
                          "Unexpected exception (" + t.getMessage() + ") parsing", t);
        }
    }

This sets up some local state, marks the current starting point, and tries to parse DDL statements until no more can be found. If the parsing logic fails to find a match, it generates a ParsingException with the offending line and column plus a message signaling what was found and what was expected. In such cases, this method rewinds the token stream (in case the caller wishes to try an alternative different parser).

Each time the parseNextStatement method is called, the starting position of that statement is passed into the method, giving it the starting position of the statement. Our MySqlDdlParser subclass overrides the parseNextStatement method to use the first token in the statement to determine the kinds of statement allowed in the MySQL DDL grammar:

    @Override
    protected void parseNextStatement(Marker marker) {
        if (tokens.matches(DdlTokenizer.COMMENT)) {
            parseComment(marker);
        } else if (tokens.matches("CREATE")) {
            parseCreate(marker);
        } else if (tokens.matches("ALTER")) {
            parseAlter(marker);
        } else if (tokens.matches("DROP")) {
            parseDrop(marker);
        } else if (tokens.matches("RENAME")) {
            parseRename(marker);
        } else {
            parseUnknownStatement(marker);
        }
    }

When a matching token is found, the method calls the appropriate method. For example, if the statement begins with CREATE TABLE …​, then the parseCreate method is called with the same marker that identifies the starting position of the statement:

    @Override
    protected void parseCreate(Marker marker) {
        tokens.consume("CREATE");
        if (tokens.matches("TABLE") || tokens.matches("TEMPORARY", "TABLE")) {
            parseCreateTable(marker);
        } else if (tokens.matches("VIEW")) {
            parseCreateView(marker);
        } else if (tokens.matchesAnyOf("DATABASE", "SCHEMA")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("EVENT")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("FUNCTION", "PROCEDURE")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("UNIQUE", "FULLTEXT", "SPATIAL", "INDEX")) {
            parseCreateIndex(marker);
        } else if (tokens.matchesAnyOf("SERVER")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("TABLESPACE")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("TRIGGER")) {
            parseCreateUnknown(marker);
        } else {
            // It could be several possible things (including more
            // elaborate forms of those matches tried above),
            sequentially(this::parseCreateView,
                         this::parseCreateUnknown);
        }
    }

Here, the method first consumes the token with the CREATE literal, and then tries to match the tokens with various patterns of token literals. If a match is found, this method delegates to other more specific parsing methods. Note how the fluent API of the framework makes it quite easy to understand the match patterns.

Let’s go one step further. Assuming our DDL statement starts with CREATE TABLE products (, then the parser will then invoke the parseCreateTable method, again with the same marker denoting the start of the statement:

    protected void parseCreateTable(Marker start) {
        tokens.canConsume("TEMPORARY");
        tokens.consume("TABLE");
        boolean onlyIfNotExists = tokens.canConsume("IF", "NOT", "EXISTS");
        TableId tableId = parseQualifiedTableName(start);
        if ( tokens.canConsume("LIKE")) {
            TableId originalId = parseQualifiedTableName(start);
            Table original = databaseTables.forTable(originalId);
            if ( original != null ) {
                databaseTables.overwriteTable(tableId, original.columns(),
                                              original.primaryKeyColumnNames());
            }
            consumeRemainingStatement(start);
            debugParsed(start);
            return;
        }
        if (onlyIfNotExists && databaseTables.forTable(tableId) != null) {
            // The table does exist, so we should do nothing ...
            consumeRemainingStatement(start);
            debugParsed(start);
            return;
        }
        TableEditor table = databaseTables.editOrCreateTable(tableId);

        // create_definition ...
        if (tokens.matches('(')) parseCreateDefinitionList(start, table);
        // table_options ...
        parseTableOptions(start, table);
        // partition_options ...
        if (tokens.matches("PARTITION")) {
            parsePartitionOptions(start, table);
        }
        // select_statement
        if (tokens.canConsume("AS") || tokens.canConsume("IGNORE", "AS")
            || tokens.canConsume("REPLACE", "AS")) {
            parseAsSelectStatement(start, table);
        }

        // Update the table definition ...
        databaseTables.overwriteTable(table.create());
        debugParsed(start);
    }

This method tries to mirror the MySQL CREATE TABLE grammar rules, which start with:

CREATE [TEMPORARY] TABLE [IF NOT EXISTS] tbl_name
    (create_definition,...)
    [table_options]
    [partition_options]

CREATE [TEMPORARY] TABLE [IF NOT EXISTS] tbl_name
    [(create_definition,...)]
    [table_options]
    [partition_options]
    select_statement

CREATE [TEMPORARY] TABLE [IF NOT EXISTS] tbl_name
    { LIKE old_tbl_name | (LIKE old_tbl_name) }

create_definition:
    ...

The CREATE literal was already consumed before our parseCreateTable begins, so it first tries to consume the TEMPORARY literal if available, the TABLE literal, the IF NOT EXISTS fragment if avaialble, and then consumes and parses the qualified name of the table. If the statement includes LIKE otherTable, it uses the databaseTables (which is the reference to our Tables object) to overwrite the definition of the named table with that of the referenced table. Otherwise, it obtains an editor for the new table, and then (like the grammar rules) parses a list of create_definition fragments, followed by table_options, partition_options, and possibly a select_statement.

Take a look at the full MySqlDdlParser class to see far more details.

Wrap up

This post goes into some detail about why the MySQL connector uses the DDL statements in the binlog, though we only scratched the surface about how the connector does the DDL parsing with its framework, and how that can be reused in future parsers for other DBMS dialects.

Try our tutorial to see the MySQL connector in action, and stay tuned for more connectors, releases, and news.


back to top