Subscribe


Streaming MySQL Data Changes to Amazon Kinesis

Most of the times Debezium is used to stream data changes into Apache Kafka. What though if you’re using another streaming platform such as Apache Pulsar or a cloud-based solution such as Amazon Kinesis, Azure Event Hubs and the like? Can you still benefit from Debezium’s powerful change data capture (CDC) capabilities and ingest changes from databases such as MySQL, Postgres, SQL Server etc.?

Turns out, with just a bit of glue code, you can! In the following we’ll discuss how to use Debezium to capture changes in a MySQL database and stream the change events into Kinesis, a fully-managed data streaming service available on the Amazon cloud.

Introducing the Debezium Embedded Engine

Debezium is implemented as a set of connectors for Kafka and thus usually is run via Kafka Connect. But there’s one little gem in Debezium which isn’t as widely known yet, which is the embedded engine.

When using this engine, the Debezium connectors are not executed within Kafka Connect, but as a library embedded into your own Java application. For this purpose, the debezium-embedded module provides a small runtime environment which performs the tasks that’d otherwise be handled by the Kafka Connect framework: requesting change records from the connector, committing offsets etc. Each change record produced by the connector is passed to a configured event handler method, which in our case will convert the record into its JSON representation and submit it to a Kinesis stream, using the Kinesis Java API.

The overall architecture looks like so:

Debezium Embedded Engine Streaming to Amazon Kinesis

Now let’s walk through the relevant parts of the code required for that. A complete executable example can be found in the debezium-examples repo on GitHub.

Set-Up

In order to use Debezium’s embedded engine, add the debezium-embedded dependency as well as the Debezium connector of your choice to your project’s pom.xml. In the following we’re going to use the connector for MySQL. We also need to add a dependency to the Kinesis Client API, so these are the dependencies needed:

...
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>0.8.3.Final</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>0.8.3.Final</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.9.0</version>
</dependency>
...

Configuring the Embedded Engine

The Debezium embedded engine is configured through an instance of io.debezium.config.Configuration. This class can obtain values from system properties or from a given config file, but for the sake of the example we’ll simply pass all required values via its fluent builder API:

Configuration config = Configuration.create()
    .with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.mysql.MySqlConnector")
    .with(EmbeddedEngine.ENGINE_NAME, "kinesis")
    .with(MySqlConnectorConfig.SERVER_NAME, "kinesis")
    .with(MySqlConnectorConfig.SERVER_ID, 8192)
    .with(MySqlConnectorConfig.HOSTNAME, "localhost")
    .with(MySqlConnectorConfig.PORT, 3306)
    .with(MySqlConnectorConfig.USER, "debezium")
    .with(MySqlConnectorConfig.PASSWORD, "dbz")
    .with(MySqlConnectorConfig.DATABASE_WHITELIST, "inventory")
    .with(MySqlConnectorConfig.TABLE_WHITELIST, "inventory.customers")
    .with(EmbeddedEngine.OFFSET_STORAGE,
        "org.apache.kafka.connect.storage.MemoryOffsetBackingStore")
    .with(MySqlConnectorConfig.DATABASE_HISTORY,
        MemoryDatabaseHistory.class.getName())
    .with("schemas.enable", false)
    .build();

If you’ve ever set up the Debezium MySQL connector in Kafka Connect, most of the properties will look familiar to you.

But let’s talk about the OFFSET_STORAGE and DATABASE_HISTORY options in a bit more detail. They deal with how connector offsets and the database history should be persisted. When running the connector via Kafka Connect, both would typically be stored in specific Kafka topics. But that’s not an option here, so an alternative is needed. For this example we’re simply going to keep the offsets and database history in memory. I.e. if the engine is restarted, this information will be lost and the connector will start from scratch, e.g. with a new initial snapshot.

While out of scope for this blog post, it wouldn’t be too difficult to create alternative implementations of the OffsetBackingStore and DatabaseHistory contracts, respectively. For instance if you’re fully committed into the AWS cloud services, you could think of storing offsets and database history in the DynamoDB NoSQL store. Note that, different from Kafka, a Kinesis stream wouldn’t be suitable for storing the database history. The reason being, that the maximum retention period for Kinesis data streams is seven days, whereas the database history must be kept for the entire lifetime of the connector. Another alternative could be to use the existing filesystem based implementations FileOffsetBackingStore and FileDatabaseHistory, respectively.

The next step is to build an EmbeddedEngine instance from the configuration. Again this is done using a fluent API:

EmbeddedEngine engine = EmbeddedEngine.create()
    .using(config)
    .using(this.getClass().getClassLoader())
    .using(Clock.SYSTEM)
    .notifying(this::sendRecord)
    .build();

The most interesting part here is the notifying call. The method passed here is the one which will be invoked by the engine for each emitted data change record. So let’s take a look at the implementation of this method.

Sending Change Records to Kinesis

The sendRecord() method is where the magic happens. We’ll convert the incoming SourceRecord into an equivalent JSON representation and propagate it to a Kinesis stream.

For that, it’s important to understand some conceptual differences between Apache Kafka and Kinesis. Specifically, messages in Kafka have a key and a value (which both are arbitrary byte arrays). In case of Debezium, the key of data change events represents the primary key of the affected record and the value is a structure comprising of old and new row state as well as some additional metadata.

In Kinesis on the other hand a message contains a data blob (again an arbitrary byte sequence) and a partition key. Kinesis streams can be split up into multiple shards and the partition key is used to determine into which shard a given message should go.

Now one could think of mapping the key from Debezium’s change data events to the Kinesis partition key, but partition keys are limited to a length of 256 bytes. Depending on the length of primary key column(s) in the captured tables, this might not be enough. So a safer option is to create a hash value from the change message key and use that as the partition key. This in turn means that the change message key structure should be added next to the actual value to the Kinesis message’s data blob. While the key column values themselves are part of the value structure, too, a consumer otherwise wouldn’t know which column(s) make up the primary key.

With that in mind, let’s take a look at the sendRecord() implementation:

private void sendRecord(SourceRecord record) {
    // We are interested only in data events not schema change events
    if (record.topic().equals("kinesis")) {
        return;
    }

    // create schema for container with key *and* value
    Schema schema = SchemaBuilder.struct()
        .field("key", record.keySchema())
        .field("value", record.valueSchema())
        .build();

    Struct message = new Struct(schema);
    message.put("key", record.key());
    message.put("value", record.value());

    // create partition key by hashing the record's key
    String partitionKey = String.valueOf(
        record.key() != null ? record.key().hashCode() : -1);

    // create data blob representing the container by using Kafka Connect's
    // JSON converter
    final byte[] payload = valueConverter.fromConnectData(
        "dummy", schema, message);

    // Assemble the put-record request ...
    PutRecordRequest putRecord = new PutRecordRequest();

    putRecord.setStreamName(record.topic());
    putRecord.setPartitionKey(partitionKey);
    putRecord.setData(ByteBuffer.wrap(payload));

    // ... and execute it
    kinesisClient.putRecord(putRecord);
}

The code is quite straight-forward; as discussed above it’s first creating a container structure containing key and value of the incoming source record. This structure then is converted into a binary representation using the JSON converter provided by Kafka Connect (an instance of JsonConverter). Then a PutRecordRequest is assembled from that blob, the partition key and the change record’s topic name, which finally is sent to Kinesis.

The Kinesis client object can be re-used and is set up once like so:

// Uses the credentials from the local "default" AWS profile
AWSCredentialsProvider credentialsProvider =
    new ProfileCredentialsProvider("default");

this.kinesisClient = AmazonKinesisClientBuilder.standard()
    .withCredentials(credentialsProvider)
    .withRegion("eu-central-1") // use your AWS region here
    .build();

With that, we’ve set up an instance of Debezium’s EmbeddedEngine which runs the configured MySQL connector and passes each emitted change event to Amazon Kinesis. The last missing step is to actually run the engine. This is done on a separate thread using an Executor, e.g. like so:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);

Note you also should make sure to properly shut down the engine eventually. How that can be done is shown in the accompanying example in the debezium-examples repo.

Running the Example

Finally let’s take a look at running the complete example and consuming the Debezium CDC events from the Kinesis stream. Start by cloning the examples repository and go to the kinesis directory:

git clone https://github.com/debezium/debezium-examples.git
cd debezium-examples/kinesis

Make sure you’ve met the prerequisites described in the example’s README.md; most notably you should have a local Docker installation and you’ll need to have set up an AWS account as well as have the AWS client tools installed. Note that Kinesis isn’t part of the free tier when registering with AWS, i.e. you’ll pay a (small) amount of money when executing the example. Don’t forget to delete the streams you’ve set up once done, we won’t pay your AWS bills :)

Now run Debezium’s MySQL example database to have some data to play with:

docker run -it --rm --name mysql -p 3306:3306 \
  -e MYSQL_ROOT_PASSWORD=debezium \
  -e MYSQL_USER=mysqluser \
  -e MYSQL_PASSWORD=mysqlpw \
  debezium/example-mysql:0.8

Create a Kinesis stream for change events from the customers table:

aws kinesis create-stream --stream-name kinesis.inventory.customers \
  --shard-count 1

Execute the Java application that runs the Debezium embedded engine (if needed, adjust the value of the kinesis.region property in pom.xml to your own region first):

mvn exec:java

This will start up the engine and the MySQL connector, which takes an initial snapshot of the captured database.

In order to take a look at the CDC events in the Kinesis stream, the AWS CLI can be used (usually, you’d implement a Kinesis Streams application for consuming the events). To do so, set up a shard iterator first:

ITERATOR=$(aws kinesis get-shard-iterator --stream-name kinesis.inventory.customers --shard-id 0 --shard-iterator-type TRIM_HORIZON | jq '.ShardIterator')

Note how the jq utility is used to obtain the generated id of the iterator from the JSON structure returned by the Kinesis API. Next that iterator can be used to examine the stream:

aws kinesis get-records --shard-iterator $ITERATOR

You should receive an array of records like this:

{
    "Records": [
        {
            "SequenceNumber":
                "49587760482547027816046765529422807492446419903410339842",
            "ApproximateArrivalTimestamp": 1535551896.475,
            "Data": "eyJiZWZvcm...4OTI3MzN9",
            "PartitionKey": "eyJpZCI6MTAwMX0="
        },
        ...
    ]
}

The Data element is a Base64-encoded representation of the message’s data blob. Again jq comes in handy: we can use it to just extract the Data part of each record and decode the Base64 representation (make sure to use jq 1.6 or newer):

aws kinesis get-records --shard-iterator $ITERATOR | \
  jq -r '.Records[].Data | @base64d' | jq .

Now you should see the change events as JSON, each one with key and value:

{
  "key": {
    "id": 1001
  },
  "value": {
    "before": null,
    "after": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "0.8.1.Final",
      "name": "kinesis",
      "server_id": 0,
      "ts_sec": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": "inventory",
      "table": "customers",
      "query": null
    },
    "op": "c",
    "ts_ms": 1535555325628
  }
}
...

Next let’s try and update a record in MySQL:

# Start MySQL CLI client
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 \
  sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" \
  -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

# In the MySQL client
use inventory;
update customers set first_name = 'Trudy' where id = 1001;

If you now fetch the iterator again, you should see one more data change event representing that update:

...

{
  "key": {
    "id": 1001
  },
  "value": {
    "before": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "after": {
      "id": 1001,
      "first_name": "Trudy",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "0.8.1.Final",
      "name": "kinesis",
      "server_id": 223344,
      "ts_sec": 1535627629,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 364,
      "row": 0,
      "snapshot": false,
      "thread": 10,
      "db": "inventory",
      "table": "customers",
      "query": null
    },
    "op": "u",
    "ts_ms": 1535627622546
  }
}

Once you’re done, stop the embedded engine application by hitting Ctrl + C, stop the MySQL server by running docker stop mysql and delete the kinesis.inventory.customers stream in Kinesis.

Summary and Outlook

In this blog post we’ve demonstrated that Debezium cannot only be used to stream data changes into Apache Kafka, but also into other streaming platforms such as Amazon Kinesis. Leveraging its embedded engine and by implementing a bit of glue code, you can benefit from all the CDC connectors provided by Debezium and their capabilities and connect them to the streaming solution of your choice.

And we’re thinking about even further simplifying this usage of Debezium. Instead of requiring you to implement your own application that invokes the embedded engine API, we’re considering to provide a small self-contained Debezium runtime which you can simply execute. It’d be configured with the source connector to run and make use of an outbound plug-in SPI with ready-to-use implementations for Kinesis, Apache Pulsar and others. Of course such runtime would also provide suitable implementations for safely persisting offsets and database history, and it’d offer means of monitoring, health checks etc. Meaning you could connect the Debezium source connectors with your preferred streaming platform in a robust and reliable way, without any manual coding required!

If you like this idea, then please check out JIRA issue DBZ-651 and let us know about your thoughts, e.g. by leaving a comment on the issue, in the comment section below or on our mailing list.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium 0.8.2 Released

The Debezium team is back from summer holidays and we’re happy to announce the release of Debezium 0.8.2!

This is a bugfix release to the current stable release line of Debezium, 0.8.x, while the work on Debezium 0.9 is continuing.

Note: By accident the version of the release artifacts is 0.8.2 instead of 0.8.2.Final. This is not in line with our recently established convention of always letting release versions end with qualifiers such as Alpha1, Beta1, CR1 or Final. The next version in the 0.8 line will be 0.8.3.Final and we’ll improve our release pipeline to make sure that this situation doesn’t occur again.

The 0.8.2 release contains 10 fixes overall, most of them dealing with issues related to DDL parsing as done by the Debezium MySQL connector. For instance, implicit non-nullable primary key columns will be handled correctly now using the new Antlr-based DDL parser (DBZ-860). Also the MongoDB connector saw a bug fix (DBZ-838): initial snapshots will be interrupted now if the connector is requested to stop (e.g. when shutting down Kafka Connect). More a useful improvement rather than a bug fix is the Postgres connector’s capability to add the table, schema and database names to the source block of emitted CDC events (DBZ-866).

Thanks a lot to community members Andrey Pustovetov, Cliff Wheadon and Ori Popowski for their contributions to this release!

What’s next?

We’re continuing the work on Debezium 0.9, which will mostly be about improvements to the SQL Server and Oracle connectors. Both will get support for handling structural changes to captured tables while the connectors are running. Also the exploration of alternatives to using the XStream API for the Oracle connector continues.

Finally, a recurring theme of our work is to further consolidate the code bases of the different connectors, which will allow us to roll out new and improved features more quickly across all the Debezium connectors. The recently added Oracle and SQL Server connectors already share a lot of code, and in the next step we’ve planned to move the existing Postgres connector to the new basis established for these two connectors.

If you’d like to learn more about some middle and long term ideas, please check out our roadmap. Also please get in touch with us if you got any ideas or suggestions for future development.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium 0.9 Alpha1 and 0.8.1 Released

Just two weeks after the Debezium 0.8 release, I’m very happy to announce the release of Debezium 0.9.0.Alpha1!

The main feature of the new version is a first work-in-progress version of the long-awaited Debezium connector for MS SQL Server. Based on the CDC functionality available in the Enterprise and Standard editions, the new connector lets you stream data changes out of Microsoft’s popular RDBMS.

Besides that we’ve continued the work on the Debezium Oracle connector. Most notably, it supports initial snapshots of captured tables now. We’ve also upgraded Apache Kafka in our Docker images to 1.1.1 (DBZ-829).

Please take a look at the change log for the complete list of changes in 0.9.0.Alpha1 and general upgrade notes.

Note: At the time of writing (2018-07-26), the release artifacts (connector archives) are available on Maven Central. We’ll upload the Docker images for 0.9.0.Alpha1 to Docker Hub as soon as possible. The Docker images are already uplodaded and ready for use under tags 0.9.0.Alpha1 and rolling 0.9.

SQL Server Connector

Support for SQL Server had been on the wish list of Debezium users for a long time (the original issue was DBZ-40). Thanks to lots of basic infrastructure created while working on the Oracle connector, we were finally able to come up with a first preview of this new connector in comparatively short time of development.

Just as the Oracle connector, the one for SQL Server is under active development and should be considered an incubating feature at this point. So for instance the structure of emitted change messages may change in upcoming releases. In terms of features, it supports initial snapshotting and capturing changes via SQL Server’s CDC functionality. There’s support for the most common column types, table whitelisting/blacklisting and more. The most significant feature missing is support for structural changes of tables while the connector is running. This is the next feature we’ll work on and it’s planned to be delivered as part of the next 0.9 release (see DBZ-812).

We’d be very happy to learn about any feedback you may have on this newest connector of the Debezium family. If you spot any bugs or have feature requests for it, please create a report in our JIRA tracker.

Oracle Connector

The Debezium connector for Oracle is able to take initial snapshots now. By means of the new connector option snapshot.mode you can control whether read events for all the records of all the captured tables should be emitted.

In addition the support for numeric data types has been honed (DBZ-804); any integer columns (i.e. NUMBER with a scale <\= 0) will be emitted using the corresponding int8/int16/int32/int64 field type, if the columns precision allows for that.

We’ve also spent some time on expanding the Oracle connector documentation, which covers the structure of emitted change events and all the data type mappings in detail now.

Debezium 0.8.1.Final

Together with Debezium 0.9.0.Alpha1 we also did another release of the current stable Debezium version 0.8.

While 0.9 at this point is more interesting to those eager to try out the latest developments in the Oracle and SQL Server connectors, 0.8.1.Final is a recommended upgrade especially to the users of the Postgres connector. This release fixes an issue where it could happen that WAL segments on the server were retained longer than necessary, in case only records of non-whitelisted tables changed for a while. This has been addressed by means of supporting heartbeat messages (as already known from the MySQL connector) also for Postgres (DBZ-800). This lets the connector regularly commit offsets to Kafka Connect which also serves as the hook to acknowledge processed LSNs with the Postgres server.

You can find the list of all changes done in Debezium 0.8.1.Final in the change log.

What’s next?

As discussed above, we’ll work on supporting structural changes to captured tables while the SQL Server connector is running. The same applies to the Oracle connector. This will require some work on our DDL parsers, but thanks to the foundations provided by our recent migration of the MySQL DDL parser to Antlr, this should be manageable.

The other big focus of work with be to provide an alternative implementation for getting changes from Oracle which isn’t based on the XStream API. We’ve done some experiments with LogMiner and are also actively exploring further alternatives. While some details are still unclear, we are optimistic to have something to release in this area soon.

If you’d like to learn more about some middle and long term ideas, please check out our roadmap. Also please get in touch with us if you got any ideas or suggestions for future development.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Five Advantages of Log-Based Change Data Capture

Yesterday I had the opportunity to present Debezium and the idea of change data capture (CDC) to the Darmstadt Java User Group. It was a great evening with lots of interesting discussions and questions. One of the questions being the following: what is the advantage of using a log-based change data capturing tool such as Debezium over simply polling for updated records?

So first of all, what’s the difference between the two approaches? With polling-based (or query-based) CDC you repeatedly run queries (e.g. via JDBC) for retrieving any newly inserted or updated rows from the tables to be captured. Log-based CDC in contrast works by reacting to any changes to the database’s log files (e.g. MySQL’s binlog or MongoDB’s op log).

As this wasn’t the first time this question came up, I thought I could provide a more extensive answer also here on the blog. That way I’ll be able to refer to this post in the future, should the question come up again :)

So without further ado, here’s my list of five advantages of log-based CDC over polling-based approaches.

All Data Changes Are Captured

By reading the database’s log, you get the complete list of all data changes in their exact order of application. This is vital for many use cases where you are interested in the complete history of record changes. In contrast, with a polling-based approach you might miss intermediary data changes that happen between two runs of the poll loop. For instance it could happen that a record is inserted and deleted between two polls, in which case this record would never be captured by poll-based CDC.

Related to this is the aspect of downtimes, e.g. when updating the CDC tool. With poll-based CDC, only the latest state of a given record would be captured once the CDC tool is back online, missing any earlier changes to the record that occurred during the downtime. A log-based CDC tool will be able to resume reading the database log from the point where it left off before it was shut down, causing the complete history of data changes to be captured.

Low Delays of Events While Avoiding Increased CPU Load

With polling, you might be tempted to increase the frequency of polling attempts in order to reduce the chances of missing intermediary updates. While this works to some degree, polling too frequently may cause performance issues (as the queries used for polling cause load on the source database). On the other hand, expanding the polling interval will reduce the CPU load but may not only result in missed change events but also in a longer delay for propagating data changes. Log-based CDC allows you to react to data changes in near real-time without paying the price of spending CPU time on running polling queries repeatedly.

No Impact on Data Model

Polling requires some indicator to identify those records that have been changed since the last poll. So all the captured tables need to have some column like LAST_UPDATE_TIMESTAMP which can be used to find changed rows. This can be fine in some cases, but in others such requirement might not be desirable. Specifically, you’ll need to make sure that the update timestamps are maintained correctly on all tables to be captured by the writing applications or e.g. through triggers.

Can Capture Deletes

Naturally, polling will not allow you to identify any records that have been deleted since the last poll. Often times that’s a problem for replication-like use cases where you’d like to have an identical data set on the source database and the replication targets, meaning you’d also like to delete records on the sink side if they have been removed in the source database.

Can Capture Old Record State And Further Meta Data

Depending on the source database’s capabilities, log-based CDC can provide the old record state for update and delete events. Whereas with polling, you’ll only get the current row state. Having the old row state handy in a single change event can be interesting for many use cases, e.g. if you’d like to display the complete data change with old and new column values to an application user for auditing purposes.

In addition, log-based approaches often can provide streams of schema changes (e.g. in form of applied DDL statements) and expose additional metadata such as transaction ids or the user applying a certain change. These things may generally be doable with query-based approaches, too (depending on the capabilities of the database), I haven’t really seen it being done in practice, though.

Summary

And that’s it, five advantages of log-based change data capture. Note that this is not to say that polling-based CDC doesn’t have its applications. If for instance your use case can be satisfied by propagating changes once per hour and it’s not a problem to miss intermediary versions of records that were valid in between, it can be perfectly fine.

But if you’re interested in capturing data changes in near real-time, making sure you don’t miss any change events (including deletions), then I’d recommend very much to explore the possibilities of log-based CDC as enabled by Debezium. The Debezium connectors do all the heavy-lifting for you, i.e. you don’t have to deal with all the low-level specifics of the individual databases and the means of getting changes from their logs. Instead, you can consume the generic and largely unified change data events produced by Debezium.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium 0.8 Final Is Released

I’m very happy to announce the release of Debezium 0.8.0.Final!

The key features of Debezium 0.8 are the first work-in-progress version of our Oracle connector (based on the XStream API) and a brand-new parser for MySQL DDL statements. Besides that, there are plenty of smaller new features (e.g. propagation of default values to corresponding Connect schemas, optional propagation of source queries in CDC messages and a largely improved SMT for sinking changes from MongoDB into RDBMS) as well as lots of bug fixes (e.g. around temporal and numeric column types, large transactions with Postgres).

Please see the previous announcements (Beta 1, CR 1) to learn about all the changes in more depth. The Final release largely resembles CR1; apart from further improvements to the Oracle connector (DBZ-792) there’s one nice addition to the MySQL connector contributed by Peter Goransson: when doing a snapshot, it will now expose information about the processed rows via JMX (DBZ-789), which is very handy when snapshotting larger tables.

Please take a look at the change log for the complete list of changes in 0.8.0.Final and general upgrade notes.

What’s next?

We’re continuing our work on the Oracle connector. The work on initial snapshotting is well progressing and it should be part of the next release. Other improvements will be support for structural changes to captured tables after the initial snapshot has been made, more extensive source info metadata and more. Please track DBZ-716 for this work; the improvements are planned to be released incrementally in the upcoming versions of Debezium.

We’ve also started to explore ingesting changes via LogMiner. This is more involved in terms of engineering efforts than using XStream, but it comes with the huge advantage of not requiring a separate license (LogMiner comes with the Oracle database itself). It’s not quite clear yet when we can release something on this front, and we’re also actively exploring further alternatives. But we are quite optimistic and hope to have something some time soon.

The other focus of work is a connector for SQL Server (see DBZ-40). Work on this has started as well, and there should be an Alpha1 release of Debezium 0.9 with a first drop of that connector within the next few weeks.

To find out about some more long term ideas, please check out our roadmap and get in touch with us, if you got any ideas or suggestions for future development.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


back to top