Subscribe


Debezium 0.5.1 Released

It’s my pleasure to announce the release of Debezium 0.5.1!

This release fixes several bugs in the MySQL, Postgres and MongoDB connectors. There’s also support for some new datatypes: POINT on MySQL (DBZ-222) and TSTZRANGE on Postgres (DBZ-280). This release is a drop-in replacement for 0.5.0, upgrading is recommended to all users.

Note that in the — rather unlikely — case that you happened to enable Debezium for all the system tables of MySQL, any configured table filters will be applied to these system tables now, too (DBZ-242). This may require an adjustment of your filters if you indeed wanted to capture all system tables but only selected non-system tables.

Please refer to the changelog for an overview of all the 29 issues fixed in Debezium 0.5.1.

The Docker image containing Kafka Connect and all the Debezium 0.5.x connectors as well as the image containing Postgres and the Debezium logical decoding plug-in have been updated to 0.5.1, too.

As Debezium continues to evolve and grow, the number of people contributing to the project is also going up. The following people have sent in pull requests for this release: Aaron Rosenberg, Alexander Kukushkin, Brendan Maguire, Duncan Sands, David Leibovic, Jiri Pechanec, nacivida, Omar Al-Safi, Randall Hauch and Tom Bentley.

Thanks a lot to you and everyone else in the community contributing via feature requests, bug reports, discussions and questions!

What’s next

We’ve planned to do further bug fix releases for the 0.5.x line. Specifically, we’ll release a fix for DBZ-217 shortly, which is about the MySQL connector stumbling when getting across a corrupt event in the binlog.

In parallel we’re looking into Debezium connectors for SQL Server and Oracle. While we cannot promise anything yet in terms of when these will be ready to be published, we hope to have at least one of them ready some time soon. Stay tuned and get involved!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium 0.5.0 Released

We’re happy to announce that Debezium 0.5.0 is now available for use with Kafka Connect 0.10.2.0. This release also includes a few fixes for the MySQL connector. See the release notes for specifics on these changes, and be sure to check out the Kafka documentation for compatibility with the version of the Kafka broker that you are using.

Kafka Connect 0.10.2.0 comes with a significant new feature called Single Message Transforms, and you can now use them with Debezium connectors. SMTs allow you to modify the messages produced by Debezium connectors and any oher Kafka Connect source connectors, before those messages are written to Kafka. SMTs can also be used with Kafka Connect sink connectors to modify the messages before the sink connectors processes them. You can use SMTs to filter out or mask specific fields, add new fields, modify existing fields, change the topic and/or topic partition to which the messages are written, and even more. And you can even chain multiple SMTs together.

Kafka Connect comes with a number of built-in SMTs that you can simply configure and use, but you can also create your own SMT implementations to do more complex and interesting things. For example, although Debezium connectors normally map all of the changes in each table (or collection) to separate topics, you can write a custom SMT that uses a completely different mapping between tables and topics and even add fields to message keys and/or values. Using your new SMT is also very easy - simply put it on the Kafka Connect classpath and update the connector configuration to use it.

We’ve also added Debezium Docker images labelled 0.5 and latest, which we use in our tutorial.

Thanks to Sanjay and everyone in the community for their help with this release, issues, discussions, contributions, and questions!

What’s next

We’ll continue to improve the MongoDB, MySQL, and PostgreSQL connectors and pushing out 0.5.x releases with fixes. And we’re still working on connectors for SQL Server and Oracle. Stay tuned and get involved!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium 0.4.1 Released

We’re happy to announce that Debezium 0.4.1 is now available for use with Kafka Connect 0.10.1.1. This release includes several fixes for the MongoDB connector and MySQL connector, including improved support for Amazon RDS and Amazon Aurora (MySQL compatibility). See the release notes for specifics on these changes.

We’ve also updated the Debezium Docker images labelled 0.4 and latest, which we use in our tutorial.

Thanks to Jan, Horia, David, Josh, Johan, Sanjay, Saulius, and everyone in the community for their help with this release, issues, discussions, contributions, and questions!

What’s next

Kafka 0.10.2.0 is out, so we plan to release 0.5.0 next week with all of the changes/fixes in 0.4.1 but with support for Kafka 0.10.2.0. We’ll then continue to improve the MongoDB, MySQL, and PostgreSQL connectors and pushing out 0.5.x releases. Stay tuned and get involved!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Streaming databases in realtime with MySQL, Debezium, and Kafka

This post originally appeared on the WePay Engineering blog.

Change data capture has been around for a while, but some recent developments in technology have given it new life. Notably, using Kafka as a backbone to stream your database data in realtime has become increasingly common.

If you’re wondering why you might want to stream database changes into Kafka, I highly suggest reading The Hardest Part About Microservices: Your Data. At WePay, we wanted to integrate our microservices and downstream datastores with each other, so every system could get access to the data that it needed. We use Kafka as our data integration layer, so we needed a way to get our database data into it.

Last year, Yelp’s engineering team published an excellent series of posts on their data pipeline. These included a discussion on how they stream MySQL data into Kafka. Their architecture involves a series of homegrown pieces of software to accomplish the task, notably schematizer and MySQL streamer. The write-up triggered a thoughtful post on Debezium’s blog about a proposed equivalent architecture using Kafka connect, Debezium, and Confluent’s schema registry. This proposed architecture is what we’ve been implementing at WePay, and this post describes how we leverage Debezium and Kafka connect to stream our MySQL databases into Kafka.

Architecture

The flow of data starts with each microservice’s MySQL database. These databases run in Google Cloud as CloudSQL MySQL instances with GTIDs enabled. We’ve set up a downstream MySQL cluster specifically for Debezium. Each CloudSQL instance replicates its data into the Debezium cluster, which consists of two MySQL machines: a primary (active) server and secondary (passive) server. This single Debezium cluster is an operational trick to make it easier for us to operate Debezium. Rather than having Debezium connect to dozens of microservice databases directly, we can connect to just a single database. This also isolates Debezium from impacting the production OLTP workload that the master CloudSQL instances are handling.

We run one Debezium connector (in distributed mode on the Kafka connect framework) for each microservice database. Again, the goal here is isolation. Theoretically, we could run a single Debezium connector that produces messages for all databases (since all microservice databases are in the Debezium cluster). This approach would actually be more resource efficient since each Debezium connector has to read MySQL’s entire binlog anyway. We opted not to do this because we wanted to be able to bring Debezium connectors up and down, and configure them differently for each microservice DB.

The Debezium connectors feed the MySQL messages into Kafka (and add their schemas to the Confluent schema registry), where downstream systems can consume them. We use our Kafka connect BigQuery connector to load the MySQL data into BigQuery using BigQuery’s streaming API. This gives us a data warehouse in BigQuery that is usually less than 30 seconds behind the data that’s in production. Other microservices, stream processors, and data infrastructure consume the feeds as well.

Debezium architecture

Debezium

The remainder of this post will focus on Debezium (the DBZ boxes in the diagram above), and how we configure and operate it. Debezium works by connecting to MySQL and pretending to be a replica. MySQL sends its replication data to Debezium, thinking it’s actually funneling data to another downstream MySQL instance. Debezium then takes the data, converts the schemas from MySQL schemas to Kafka connect structures, and forwards them to Kafka.

Adding new databases

When a new microservice with a CloudSQL database comes online, we want to get that data into Kafka. The first step in the process is to load the data into the Debezium MySQL cluster. This involves several steps:

  1. Take a MySQL dump of the data in the microservice DB.

  2. Pause the secondary Debezium MySQL DB.

  3. Load the MySQL dump into the secondary Debezium MySQL DB.

  4. Reset GTID_PURGED parameter to include the GTID from the new DB dump.

  5. Unpause the secondary Debezium MySQL DB.

  6. Update HA Proxy to point to the secondary, which now becomes the primary.

  7. Follow steps 2-5 for the old primary instance (now secondary).

The actual commands that we run are:

# (1) Take a dump of the database we wish to add.
$ mydumper  --host=123.123.123.123 --port=3306 --user=foo --password=*********  -B log --trx-consistency-only  --triggers --routines -o /mysqldata/new_db/ -c -L mydumper.log

# (2) Stop all replication on the secondary Debezium cluster.
$ mysql> STOP SLAVE for channel 'foo';
$ mysql> STOP SLAVE for channel 'bar';
$ mysql> STOP SLAVE for channel 'baz';

# Get the current GTID purged values from MySQL.
$ mysql> SHOW GLOBAL VARIABLES like '%gtid_purged%';

# (3) Load the dump of the database into the Debezium cluster.
$ myloader -d /mysqldata/new_db/ -s new_db

# (4) Clear out existing GTID_PURGED values so that we can overwrite it to include the GTID from the new dump file.
$ mysql> reset master;

# Set the new GTID_PURGED value, including the GTID_PURGED value from the MySQL dump file.
$ mysql> set global GTID_PURGED="f3a44d1a-11e6-44ba-bf12-040bab830af0:1-10752,c627b2bc-b36a-11e6-a886-42010af00790:1-9052,01261abc3-6ade-11e6-9647-42010af0044a:1-375342";

# (5) Start replication for the new DB.
$ mysql> CHANGE MASTER TO MASTER_HOST='123.123.123.123', MASTER_USER='REPLICATION_USER', MASTER_PASSWORD='REPLICATION_PASSWORD',MASTER_AUTO_POSITION=1 for CHANNEL 'new_db';
$ mysql> START SLAVE for channel 'new_db';

# Start replication for the DBs that we paused.
$ mysql> START SLAVE for channel 'foo';
$ mysql> START SLAVE for channel 'bar';
$ mysql> START SLAVE for channel 'baz';

# Repeat steps 2-5 on the old primary (now secondary).

At the end of these steps, both the primary and secondary Debezium MySQL servers have the new database. Once finished, we can then add a new Debezium connector to the Kafka connect cluster. This connector will have configuration that looks roughly like this:

{
   "name": "debezium-connector-microservice1",
   "config": {
       "name": "debezium-connector-microservice1",
       "connector.class": "io.debezium.connector.mysql.MySqlConnector",
       "tasks.max": "1",
       "database.hostname": "dbz-mysql01",
       "database.port": "3306",
       "database.user": "user",
       "database.password": "*******",
       "database.server.id": "101",
       "database.server.name": "db.debezium.microservice1",
       "gtid.source.includes": "c34aeb9e-89ad-11e6-877b-42010a93af2d",
       "database.whitelist": "microservice1_db",
       "poll.interval.ms": "2",
       "table.whitelist": "microservice1_db.table1,microservice1_db.table2",
       "column.truncate.to.1024.chars" : "microservice1_db.table1.text_col",
       "database.history.kafka.bootstrap.servers": "kafka01:9093,kafka02:9093,kafka03:9093",
       "database.history.kafka.topic": "debezium.history.microservice1",
       "database.ssl.truststore": "/certs/truststore",
       "database.ssl.truststore.password": "*******",
       "database.ssl.mode": "required",
       "database.history.producer.security.protocol": "SSL",
       "database.history.producer.ssl.truststore.location": "/certs/truststore",
       "database.history.producer.ssl.truststore.password": "*******",
       "database.history.consumer.security.protocol": "SSL",
       "database.history.consumer.ssl.truststore.location": "/certs/truststore",
       "database.history.consumer.ssl.truststore.password": "*******",
   }
}

The details on these configuration fields are located here.

The new connector will start up and begin snapshotting the database, since this is the first time it’s been started. Debezium’s snapshot implementation (see DBZ-31) uses an approach very similar to MySQL’s mysqldump tool. Once the snapshot is complete, Debezium will switch over to using MySQL’s binlog to receive all future database updates.

Kafka connect and Debezium work together to periodically commit Debezium’s location in the MySQL binlog described by a MySQL global transaction ID (GTID). When Debezium restarts, Kafka connect will give it the last committed MySQL GTID, and Debezium will pick up from there.

Note that commits only happen periodically, so Debezium might start up from a location in the log prior to the last row that it received. In such a case, you will observe duplicate messages in Debezium Kafka topic. Debezium writes messages to Kafka with an at-least-once messaging guarantee.

High availability

One of the difficulties we faced when we first began using Debezium was how to make it tolerant to machine failures (both the upstream MySQL server, and Debezium, itself). MySQL prior to version 5.6 modeled a replica’s location in its parent’s binlogs using a (binlog filename, file offset) tuple. The problem with this approach is that the binlog filenames are not the same between MySQL machines. This means that a replica reading from upstream MySQL machine 1 can’t easily fail over to MySQL machine 2. There is an entire ecosystem of tools (including MHA) to try and address this problem.

Starting with MySQL 5.6, MySQL introduced the concept of global transaction IDs. These GTIDs identify a specific location within the MySQL binlog across machines. This means that a consumer reading from a binlog on one MySQL server can switch over to the other, provided that both servers have the data available. This is how we run our systems. Both the CloudSQL instances and the Debezium MySQL cluster run with GTIDs enabled. The Debezium MySQL servers also have replication binlogs enabled so that binlogs exist for Debezium to read (replicas don’t normally have binlogs enabled by default). All of this enables Debezium to consume from the primary Debezium MySQL server, but switch over to the secondary (via HA Proxy) if there’s a failure.

If the machine that Debezium, itself, is running on fails, then the Kafka connect framework fails the connector over to another machine in the cluster. When the failover occurs, Debezium receives its last committed offset (GTID) from Kafka connect, and picks up where it left off (with the same caveat as above: you might see some duplicate messages due to periodic commit frequency).

An important configuration that needs to be called out is the gtid.source.includes field that we have set above. When we first set up the topology that’s described in the architecture section, we discovered that we could not fail over from the primary Debezium DB to the secondary DB even though they both were replicating exactly the same data. This is because, in addition to the GTIDs for the various upstream DBs that both primary and secondary machines are replicating, each machine has its own server UUID for its various MySQL databases (e.g. information_schema). The fact that these two servers have different UUIDs in them led MySQL to get confused when we triggered a failover, because Debezium’s GTID would include the server UUID for the primary server, which the secondary server didn’t know about. The fix was to filter out all UUIDs that we don’t care about from the GTID. Each Debezium connector filters out all server UUIDs except for the UUID for the microservice DB that it cares about. This allows the connector to fail from primary to secondary without issue. This issue is documented in detail on DBZ-129.

Schemas

Debezium’s message format includes both the "before" and "after" versions of a row. For inserts, the "before" is null. For deletes, the "after" is null. Updates have both the "before" and "after" fields filled out. The messages also include some server information such as the server ID that the message came from, the GTID of the message, the server timestamp, and so on.

{
  "before": {
    "id": 1004,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
  },
  "after": {
    "id": 1004,
    "first_name": "Anne Marie",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
  },
  "source": {
    "name": "mysql-server-1",
    "server_id": 223344,
    "ts_sec": 1465581,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 484,
    "row": 0,
    "snapshot": null
  },
  "op": "u",
  "ts_ms": 1465581029523
}

The serialization format that Debezium sends to Kafka is configurable. We prefer Avro at WePay for its compact size, schema DDL, performance, and rich ecosystem. We’ve configured Kafka connect to use Confluent’s Avro encoder codec for Kafka. This encoder serializes messages to Avro, but also registers the schemas with Confluent’s schema registry.

If a MySQL table’s schema is changed, Debezium adapts to the change by updating the structure and schema of the "before" and "after" portions of its event messages. This will appear to the Avro encoder as a new schema, which it will register with the schema registry before the message is sent to Kafka. The registry runs full compatibility checks to make sure that downstream consumers don’t break due to a schema evolution.

Note that it’s still possible to make an incompatible change in the MySQL schema itself, which would break downstream consumers. We have not yet added automatic compatibility checks to MySQL table alters.

Future work

Monolithic database

In addition to our microservices, we have a legacy monolithic database that’s much larger than our microservice databases. We’re in the process of upgrading this cluster to run with GTIDs enabled. Once this is done, we plan to replicate this cluster into Kafka with Debezium as well.

Large table snapshots

We’re lucky that all of our microservice databases are of relatively manageable size. Our monolithic database has some tables that are much larger. We have yet to test Debezium with very large tables, so it’s unclear if any tuning or patches will be required in order to snapshot these tables on the initial Debezium load. We have heard community reports that larger tables (6 billion+ rows) do work, provided that the configuration exposed in DBZ-152 is set. This is work we’re planning to do shortly.

More monitoring

Kafka connect doesn’t currently make it easy to expose metrics through the Kafka metrics framework. As a result, there are very few metrics available from the Kafka connect framework. Debezium does expose metrics via JMX (see DBZ-134), but we aren’t exposing them to our metrics system currently. We do monitor the system, but when things go wrong, it can be difficult to determine what’s going on. KAFKA-2376 is the open JIRA that’s meant to address the underlying Kafka connect issue.

More databases

As we add more microservice databases, we’ll begin to put pressure on the two Debezium MySQL servers that we have. Eventually, we plan to split the single Debezium cluster that we have into more than one, with some microservices replicating only to one cluster, and the rest replicating to others.

Unify compatibility checks

As I mentioned in the schema section, above, the Confluent schema registry runs schema compatibility checks out of the box right now. This makes it very easy for us to prevent backward and forward incompatible changes from making their way into Kafka. We don’t currently have an equivalent check at the MySQL layer. This is a problem because it means it’s possible for a DBA to make incompatible changes at the MySQL layer. Debezium will then fail when trying to produce the new messages into Kafka. We need to make sure this can’t happen by adding equivalent checks at the MySQL layer. DBZ-70 discusses this more.

Automatic topic configuration

We currently run Kafka with topic auto-create enabled with a default of 6 partitions, and time-based/size-based retention. This configuration doesn’t make much sense for Debezium topics. At the very least, they should be using log-compaction as their retention. We plan to write a script that looks for mis-configured Debezium topics, and updates them to appropriate retention settings.

Conclusion

We’ve been running Debezium in production for the past 8 months. Initially, we ran it dark, and then enabled it for the realtime BigQuery pipeline shown in the architecture diagram above. Recently, we’ve begun consuming the messages in microservices and stream processing systems. We look forward to adding more data to the pipeline, and addressing some of the issues that were raised in the Future work section.

A special thanks to Randall Hauch, who has been invaluable in addressing a number of bug fixes and feature requests.


Debezium 0.4.0 Released

We’re happy to announce that Debezium 0.4.0 is now available for use with Kafka Connect 0.10.1.1. This release introduces a new PostgreSQL connector, and contains over a dozen fixes combined for the MongoDB connector and MySQL connector, including preliminar support for Amazon RDS and Amazon Aurora (MySQL compatibility). See the release notes for specifics on these changes.

We’ve also created Debezium Docker images labelled 0.4 and latest, which we use in our tutorial.

Thanks to Horia, Chris, Akshath, Ramesh, Matthias, Anton, Sagi, barton, and others for their help with this release, issues, discussions, contributions, and questions!

What’s next

We’ll continue to improve the MongoDB, MySQL, and PostgreSQL connectors and pushing out 0.4.x releases. We’re also going to work on a few new connectors, though we’ll likely increase the minor version with each new connector. Stay tuned and get involved!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


back to top