Tutorial for Using Debezium Connectors With Apache Pulsar

This is a guest post by Apache Pulsar PMC Member and Committer Jia Zhai.

Debezium is an open source project for change data capture (CDC). It is built on Apache Kafka Connect and supports multiple databases, such as MySQL, MongoDB, PostgreSQL, Oracle, and SQL Server. Apache Pulsar includes a set of built-in connectors based on Pulsar IO framework, which is counter part to Apache Kafka Connect.

As of version 2.3.0, Pulsar IO comes with support for the Debezium source connectors out of the box, so you can leverage Debezium to stream changes from your databases into Apache Pulsar. This tutorial walks you through setting up the Debezium connector for MySQL with Pulsar IO.

Tutorial Steps

This tutorial is similar to the Debezium tutorial, except that storage of event streams is changed from Kafka to Pulsar. It mainly includes six steps:

  1. Start a MySQL server;

  2. Start standalone Pulsar service;

  3. Start the Debezium connector in Pulsar IO. Pulsar IO reads database changes existing in MySQL server;

  4. Subscribe Pulsar topics to monitor MySQL changes;

  5. Make changes in MySQL server, and verify that changes are recorded in Pulsar topics immediately;

  6. Clean up.

Step 1: Start a MySQL server

Start a MySQL server that contains a database example, from which Debezium captures changes. Open a new terminal to start a new container that runs a MySQL database server pre-configured with a database named inventory:

docker run --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9

The following information is displayed:

2019-03-25T14:12:41.178325Z 0 [Note] Event Scheduler: Loaded 0 events
2019-03-25T14:12:41.178670Z 0 [Note] mysqld: ready for connections.
Version: '5.7.25-log'  socket: '/var/run/mysqld/mysqld.sock'  port: 3306  MySQL Community Server (GPL)

Step 2: Start standalone Pulsar service

Start Pulsar service locally in standalone mode. Support for running Debezium connectors in Pulsar IO is introduced in Pulsar 2.3.0. Download Pulsar binary of 2.3.0 release and pulsar-io-kafka-connect-adaptor-2.3.0.nar of 2.3.0 release. In Pulsar, all Pulsar IO connectors are packaged as separate NAR files.

$ wget
$ wget
$ tar zxf apache-pulsar-2.3.0-bin.tar.gz
$ cd apache-pulsar-2.3.0
$ mkdir connectors
$ cp ../pulsar-io-kafka-connect-adaptor-2.3.0.nar connectors
$ bin/pulsar standalone
start pulsar standalone]

Step 3: Start the Debezium MySQL connector in Pulsar IO

Start the Debezium MySQL connector in Pulsar IO, with local run mode, in another terminal tab. The “debezium-mysql-source-config.yaml” file contains all the configuration, and main parameters are listed under the “configs” node. The .yaml file contains the "task.class" parameter. The configuration file also includes MySQL related parameters (like server, port, user, password) and two names of Pulsar topics for "history" and "offset" storage.

$ bin/pulsar-admin source localrun  --sourceConfigFile debezium-mysql-source-config.yaml

The content in the “debezium-mysql-source-config.yaml” file is as follows.

tenant: "test"
namespace: "test-namespace"
name: "debezium-kafka-source"
topicName: "kafka-connect-topic"
archive: "connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar"

parallelism: 1

  ## sourceTask
  task.class: "io.debezium.connector.mysql.MySqlConnectorTask"

  ## config for mysql, docker image: debezium/example-mysql:0.8
  database.hostname: "localhost"
  database.port: "3306"
  database.user: "debezium"
  database.password: "dbz" "184054" "dbserver1"
  database.whitelist: "inventory"

  database.history: ""
  database.history.pulsar.topic: "history-topic"
  database.history.pulsar.service.url: "pulsar://"
  key.converter: "org.apache.kafka.connect.json.JsonConverter"
  value.converter: "org.apache.kafka.connect.json.JsonConverter"
  pulsar.service.url: "pulsar://"

Tables are created automatically in the aforementioned MySQL server. So Debezium connector reads history records from MySQL binlog file from the beginning. In the output you will find the connector has already been triggered and processed in 47 records.

connector start process records

For more information on how to manage connectors, see the Pulsar IO documentation.

Records that have been captured and read by Debezium are automatically published to Pulsar topics. When you start a new terminal, you will find the current topics in Pulsar with the following command:

$ bin/pulsar-admin topics list public/default
list Pulsar topics

For each table, which has been changed, the change data is stored in a separate Pulsar topic. Except database table related topics, another two topics named “history-topic” and “offset-topic” are used to store history and offset related data.


Step 4: Subscribe Pulsar topics to monitor MySQL changes

Take the persistent://public/default/dbserver1.inventory.products topic as an example. Use the CLI command to consume this topic and monitor changes while the “products” table changes.

 $ bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0

The output is as follows:

22:17:41.201 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribing to topic on cnx [id: 0xfe0b4feb, L:/ - R:localhost/]
22:17:41.223 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribed to topic on localhost/ -- consumer: 0

You can also consume the offset topic to monitor the offset changes while the table changes are stored in the persistent://public/default/dbserver1.inventory.products Pulsar topic.

$ bin/pulsar-client consume -s "sub-offset" offset-topic -n 0

Step 5: Make changes in MySQL server, and verify that changes are recorded in Pulsar topics immediately

Start a MySQL CLI docker connector, and you can make changes to the “products” table in MySQL server.

$docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

After running the command, MySQL CLI is displayed, and you can change the names of the two items in the “products” table.

mysql> use inventory;
mysql> show tables;
mysql> SELECT * FROM  products ;
mysql> UPDATE products SET name='1111111111' WHERE id=101;
mysql> UPDATE products SET name='1111111111' WHERE id=107;
mysql updates

In the terminal where you consume products topic, you find that two changes have been added.

table topic stores mysql updates

In the terminal where you consume the offset topic, you find that two offsets have been added.

offset topic get updated

In the terminal where you local-run the connector, you find two more records have been processed.

table topic get more records

Step 6: Clean up.

Use “Ctrl + C” to close terminals. Use “docker ps” and “docker kill” to stop MySQL related containers.

mysql> quit

$ docker ps
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                               NAMES
84d66c2f591d        debezium/example-mysql:0.8   "docker-entrypoint.s…"   About an hour ago   Up About an hour>3306/tcp, 33060/tcp   mysql

$ docker kill 84d66c2f591d

To delete Pulsar data, delete data directory in the Pulsar binary directory.

$ pwd

$ rm -rf data


The Pulsar IO framework allows to run the Debezium connectors for change data capture, streaming data changes from different databases into Apache Pulsar. In this tutorial you’ve learned how to capture data changes in a MySQL database and propagate them to Pulsar. We are improving support for running the Debezium connectors with Apache Pulsar continuously, it will be much easier to use after Pulsar 2.4.0 release.

Jia Zhai

Jia is a core software engineer at StreamNative, as well as PMC member of both Apache BookKeeper and Apache Pulsar, and contributes to these 2 projects continually. He lives in Beijing, China.


back to top