Subscribe


Debezium 0.7.5 Is Released

It’s my pleasure to announce the release of Debezium 0.7.5!

This is a bugfix release to the 0.7 release line, which we decided to do while working towards Debezium 0.8. Most notably it fixes an unfortunate bug introduced in 0.7.3 (DBZ-663), where the internal database history topic of the Debezium MySQL connector could be partly deleted under some specific conditions. Please see the dedicated blog post on this issue to find out whether this affects you and what you should do to prevent this issue.

Together with this, we released a couple of other fixes and improvements. Thanks to Maciej Brynski, the performance of the logical table routing SMT has been improved significantly (DBZ-655). Another fix contributed by Maciej is for DBZ-646 which lets the MySQL connector handle CREATE TABLE statements for the TokuDB storage engine now.

And we got some more bugfixes by our fantastic community: Long-term community member Peter Goransson fixed an issue about the snapshot JMX metrics of the MySQL connector, which are now also accessible after the snapshot has been completed (DBZ-640). Andrew Tongen spotted and fixed an issue for the Debezium embedded engine (DBZ-665) which caused offsets to be committed more often than needed. And Matthias Wessendorf upgraded the Debezium dependencies and Docker images to Apache Kafka 1.0.1 (DBZ-647).

Thank you all for your help!

Please refer to the change log for the complete list of changes in Debezium 0.7.5.

What’s next?

Please see the previous release announcement for the next planned features. Due to the unplanned 0.7.5 release, though, the schedule of the next one will likely be extended a little bit.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


A Note On Database History Topic Configuration

A user of the Debezium connector for MySQL informed us about a potential issue with the configuration of the connector’s internal database history topic, which may cause the deletion of parts of that topic (DBZ-663). Please continue reading if you’re using the Debezium MySQL connector in versions 0.7.3 or 0.7.4.

What is the issue about?

In Debezium 0.7.3 we rolled out a feature for creating the database history automatically if it doesn’t exist yet (DBZ-278). While this feature sets the retention time for the topic to an "infinite" period, it doesn’t specify the "retention.bytes" option for the history topic. This may cause parts of the history topic to be deleted in case all of the following conditions are met:

  • You are using versions 0.7.3 or 0.7.4 of the Debezium connector for MySQL

  • The database history topic has been created by the connector (i.e. you haven’t created it yourself)

  • The broker level option "log.retention.bytes" is set to another value than -1 (note that the default is -1, in which case things work as intended)

  • The database history topic grows beyond the threshold configured via "log.retention.bytes"

If the history topic is incomplete, the connector will fail to recover the database history after a restart of the connector and will not continue with reading the MySQL binlog.

How to prevent the issue?

You should either create the database history topic yourself with an infinite retention or alternatively override the "retention.bytes" configuration for the history topic created by the connector:

<KAFKA_DIR>/bin/kafka-configs.sh \
  --zookeeper zookeeper:2181 \
  --entity-type topics \
  --entity-name <DB_HISTORY_TOPIC> \
  --alter \
  --add-config retention.bytes=-1

In case parts of the history topic were removed already, you can use the snapshot mode schema_only_recovery for re-creating the history topic in case no schema changes have happened since the last committed offset of the connector. Alternatively, a complete new snapshot should be taken, e.g. by setting up a new connector instance.

Next steps

We’ll release Debezium 0.7.5 with a fix for this issue early next week. Note that previously created database history topics should be re-configured as described above. Please don’t hesitate to get in touch in the comments below, the chat room or the mailing list in case you have any further questions on this issue.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Creating DDD aggregates with Debezium and Kafka Streams

Microservice-based architectures can be considered an industry trend and are thus often found in enterprise applications lately. One possible way to keep data synchronized across multiple services and their backing data stores is to make us of an approach called change data capture, or CDC for short.

Essentially CDC allows to listen to any modifications which are occurring at one end of a data flow (i.e. the data source) and communicate them as change events to other interested parties or storing them into a data sink. Instead of doing this in a point-to-point fashion, it’s advisable to decouple this flow of events between data sources and data sinks. Such a scenario can be implemented based on Debezium and Apache Kafka with relative ease and effectively no coding.

As an example, consider the following microservice-based architecture of an order management system:

Microservice-based architecture of an order management system

This system comprises three services, Order, Item and Stock. If the Order service receives an order request, it will need information from the other two, such as item definitions or the stock count for specific items. Instead of making synchronous calls to these services to obtain this information, CDC can be used to set up change event streams for the data managed by the Item and Stock services. The Order service can subscribe to these event streams and keep a local copy of the relevant item and stock data in its own database. This approach helps to decouple the services (e.g. no direct impact by service outages) and can also be beneficial for overall performance, as each service can hold optimized views just of those data items owned by other services which it is interested in.

How to Handle Aggregate Objects?

There are use cases however, where things are a bit more tricky. It is sometimes useful to share information across services and data stores by means of so-called aggregates, which are a concept/pattern defined by domain-driven design (DDD). In general, a DDD aggregate is used to transfer state which can be comprised of multiple different domain objects that are together treated as a single unit of information.

Concrete examples are:

  • customers and their addresses which are represented as a customer record aggregate storing a customer and a list of addresses

  • orders and corresponding line items which are represented as an order record aggregate storing an order and all its line items

Chances are that the data of the involved domain objects backing these DDD aggregates are stored in separate relations of an RDBMS. When making use of the CDC capabilities currently found in Debezium, all changes to domain objects will be independently captured and by default eventually reflected in separate Kafka topics, one per RDBMS relation. While this behaviour is tremendously helpful for a lot of use cases it can be pretty limiting to others, like the DDD aggregate scenario described above. Therefore, this blog post explores how DDD aggregates can be built based on Debezium CDC events, using the Kafka Streams API.

Capturing Change Events from a Data Source

The complete source code for this blog post is provided in the Debezium examples repository on GitHub. Begin by cloning this repository and changing into the kstreams directory:

git clone https://github.com/debezium/debezium-examples.git
cd kstreams

The project provides a Docker Compose file with services for all the components you may already know from the Debezium tutorial:

In addition it declares the following services:

  • MongoDB which will be used as a data sink

  • Another Kafka Connect instance which will host the MongoDB sink connector

  • A service for running the DDD aggregation process we’re going to build in the following

We’ll get to those three in a bit, for now let’s prepare the source side of our pipeline:

export DEBEZIUM_VERSION=0.7
docker-compose up mysql zookeeper kafka connect_source

Once all services have been started, register an instance of the Debezium MySQL connector by submitting the following JSON document:

{
    "name": "mysql-source",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "table.whitelist": "inventory.customers,inventory.addresses",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "transforms": "unwrap",
        "transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
        "transforms.unwrap.drop.tombstones":"false"
    }
}

To do so, run the following curl command:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @mysql-source.json

This sets up the connector for the specified database, using the given credentials. For our purposes we’re only interested in changes to the customers and addresses tables, hence the table.whitelist property is given to just select these two tables. Another noteworthy thing is the "unwrap" transform that is applied. By default, Debezium’s CDC events would contain the old and new state of changed rows and some additional metadata on the source of the change. By applying the UnwrapFromEnvelope SMT (single message transformation), only the new state will be propagated into the corresponding Kafka topics.

We can take a look at them once the connector has been deployed and finished its initial snapshot of the two captured tables:

docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers # or dbserver1.inventory.addresses

E.g. you should see the following output

(formatted and omitting the schema information for the sake of readability) for the topic with customer changes:

{
    "schema": { ... },
    "payload": {
        "id": 1001
    }
}
{
    "schema": { ... },
    "payload": {
        "id": 1001,
        "first_name": "Sally",
        "last_name": "Thomas",
        "email": "sally.thomas@acme.com"
    }
}
...

Building DDD Aggregates

The KStreams application is going to process data from the two Kafka topics. These topics receive CDC events based on the customers and addresses relations found in MySQL, each of which has its corresponding Jackson-annotated POJO (Customer and Address), enriched by a field holding the CDC event type (i.e. UPSERT/DELETE).

Since the Kafka topic records are in Debezium JSON format with unwrapped envelopes, a special SerDe has been written in order to be able to read/write these records using their POJO or Debezium event representation respectively. While the serializer simply converts the POJOs into JSON using Jackson, the deserializer is a "hybrid" one, being able to deserialize from either Debezium CDC events or jsonified POJOs.

With that in place, the KStreams topology to create and maintain DDD aggregates on-the-fly can be built as follows:

Customers Topic ("parent")

All the customer records are simply read from the customer topic into a KTable which will automatically maintain the latest state per customer according to the record key (i.e. the customer’s PK)

KTable<DefaultId, Customer> customerTable =
        builder.table(parentTopic, Consumed.with(defaultIdSerde,customerSerde));

Addresses Topic ("children")

For the address records the processing is a bit more involved and needs several steps. First, all the address records are read into a KStream.

KStream<DefaultId, Address> addressStream = builder.stream(childrenTopic,
        Consumed.with(defaultIdSerde, addressSerde));

Second, a 'pseudo' grouping of these address records is done based on their keys (the original primary key in the relation), During this step the relationships towards the corresponding customer records are maintained. This effectively allows to keep track which address record belongs to which customer record, even in the light of address record deletions. To achieve this an additional LatestAddress POJO is introduced which allows to store the latest known PK <→ FK relation in addition to the Address record itself.

KTable<DefaultId,LatestAddress> tempTable = addressStream
        .groupByKey(Serialized.with(defaultIdSerde, addressSerde))
        .aggregate(
                () -> new LatestAddress(),
                (DefaultId addressId, Address address, LatestAddress latest) -> {
                    latest.update(
                        address, addressId, new DefaultId(address.getCustomer_id()));
                    return latest;
                },
                Materialized.<DefaultId,LatestAddress,KeyValueStore<Bytes, byte[]>>
                        as(childrenTopic+"_table_temp")
                            .withKeySerde(defaultIdSerde)
                                .withValueSerde(latestAddressSerde)
        );

Third, the intermediate KTable is again converted to a KStream. The LatestAddress records are transformed to have the customer id (FK relationship) as their new key in order to group them per customer. During the grouping step, customer specific addresses are updated which can result in an address record being added or deleted. For this purpose, another POJO called Addresses is introduced, which holds a map of address records that gets updated accordingly. The result is a KTable holding the most recent Addresses per customer id.

KTable<DefaultId, Addresses> addressTable = tempTable.toStream()
        .map((addressId, latestAddress) ->
            new KeyValue<>(latestAddress.getCustomerId(),latestAddress))
        .groupByKey(Serialized.with(defaultIdSerde,latestAddressSerde))
        .aggregate(
                () -> new Addresses(),
                (customerId, latestAddress, addresses) -> {
                    addresses.update(latestAddress);
                    return addresses;
                },
                Materialized.<DefaultId,Addresses,KeyValueStore<Bytes, byte[]>>
                        as(childrenTopic+"_table_aggregate")
                            .withKeySerde(defaultIdSerde)
                                .withValueSerde(addressesSerde)
        );

Combining Customers With Addresses

Finally, it’s easy to bring customers and addresses together by joining the customers KTable with the addresses KTable and thereby building the DDD aggregates which are represented by the CustomerAddressAggregate POJO. At the end, the KTable changes are written to a KStream, which in turn gets saved into a kafka topic. This allows to make use of the resulting DDD aggregates in manifold ways.

KTable<DefaultId,CustomerAddressAggregate> dddAggregate =
          customerTable.join(addressTable, (customer, addresses) ->
              customer.get_eventType() == EventType.DELETE ?
                      null :
                      new CustomerAddressAggregate(customer,addresses.getEntries())
          );

  dddAggregate.toStream().to("final_ddd_aggregates",
                              Produced.with(defaultIdSerde,(Serde)aggregateSerde));

Records in the customers KTable might receive a CDC delete event. If so, this can be detected by checking the event type field of the customer POJO and e.g. return 'null' instead of a DDD aggregate. Such a convention can be helpful whenever consuming parties also need to act to deletions accordingly._

Running the Aggregation Pipeline

Having implemented the aggregation pipeline, it’s time to give it a test run. To do so, build the poc-ddd-aggregates Maven project which contains the complete implementation:

mvn clean package -f poc-ddd-aggregates/pom.xml

Then run the aggregator service from the Compose file which takes the JAR built by this project and launches it using the java-jboss-openjdk8-jdk base image:

docker-compose up -d aggregator

Once the aggregation pipeline is running, we can take a look at the aggregated events using the console consumer:

docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic final_ddd_aggregates

Transferring DDD Aggregates to Data Sinks

We originally set out to build these DDD aggregates in order to transfer data and synchronize changes between a data source (MySQL tables in this case) and a convenient data sink. By definition, DDD aggregates are typically complex data structures and therefore it makes perfect sense to write them to data stores which offer flexible ways and means to query and/or index them. Talking about NoSQL databases, a document store seems the most natural choice with MongoDB being the leading database for such use cases.

Thanks to Kafka Connect and numerous turn-key ready connectors it is almost effortless to get this done. Using a MongoDB sink connector from the open-source community, it is easy to have the DDD aggregates written into MongoDB. All it needs is a proper configuration which can be posted to the REST API of Kafka Connect in order to run the connector.

So let’s start MongoDb and another Kafka Connect instance for hosting the sink connector:

docker-compose up -d mongodb connect_sink

In case the DDD aggregates should get written unmodified into MongoDB, a configuration may look as simple as follows:

{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": "1",
        "topics": "final_ddd_aggregates",
        "mongodb.connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true",
        "mongodb.collection": "customers_with_addresses",
        "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy",
        "mongodb.delete.on.null.values": true
    }
}

As with the source connector, deploy the connector using curl:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8084/connectors/ -d @mongodb-sink.json

This connector will consume messages from the "final_ddd_aggregates" Kafka topic and write them as MongoDB documents into the "customers_with_addresses" collection.

You can take a look by firing up a Mongo shell and querying the collection’s contents:

docker-compose exec mongodb bash -c 'mongo inventory'

> db.customers_with_addresses.find().pretty()
{
    "_id": {
        "id": "1001"
    },
    "addresses": [
        {
            "zip": "76036",
            "_eventType": "UPSERT",
            "city": "Euless",
            "street": "3183 Moore Avenue",
            "id": "10",
            "state": "Texas",
            "customer_id": "1001",
            "type": "SHIPPING"
        },
        {
            "zip": "17116",
            "_eventType": "UPSERT",
            "city": "Harrisburg",
            "street": "2389 Hidden Valley Road",
            "id": "11",
            "state": "Pennsylvania",
            "customer_id": "1001",
            "type": "BILLING"
        }
    ],
    "customer": {
        "_eventType": "UPSERT",
        "last_name": "Thomas",
        "id": "1001",
        "first_name": "Sally",
        "email": "sally.thomas@acme.com"
    }
}

Due to the combination of the data in a single document some parts aren’t needed or redundant. To get rid of any unwanted data (e.g. _eventType, customer_id of each address sub-document) it would also be possible to adapt the configuration in order to blacklist said fields.

Finally, you update some customer or address data in the MySQL source database:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

mysql> update customers set first_name= "Sarah" where id = 1001;

Shortly thereafter, you should see that the corresponding aggregate document in MongoDB has been updated accordingly.

Drawbacks and Limitations

While this first version for creating DDD aggregates from table-based CDC events basically works, it is very important to understand its current limitations:

  • not generically applicable thus needs custom code for POJOs and intermediate types

  • cannot be scaled across multiple instances as is due to missing but necessary data repartitioning prior to processing

  • limited to building aggregates based on a single JOIN between 1:N relationships

  • resulting DDD aggregates are eventually consistent, meaning that it is possible for them to temporarily exhibit intermediate state before converging

The first few can be addressed with a reasonable amount of work on the KStreams application. The last one, dealing with the eventually consistent nature of resulting DDD aggregates is much harder to correct and will require some efforts at Debezium’s own CDC mechanism.

Outlook

In this post we described an approach for creating aggregated events from Debezium’s CDC events. In a follow-up blog post we may dive a bit more into the topic of how to be able to horizontally scale the DDD creation by running multiple KStreams aggregator instances. For that purpose, the data needs proper re-partitioning before running the topology. In addition, it could be interesting to look into a somewhat more generic version which only needs custom classes to the describe the two main POJOs involved.

We also thought about providing a ready-to-use component which would work in a generic way (based on Connect records, i.e. not tied to a specific serialization format such as JSON) and could be set up as a configurable stand-alone process running given aggregations.

Also on the topic of dealing with eventual consistency we got some ideas, but those will need some more exploration and investigation for sure. Stay tuned!

We’d love to hear about your feedback on the topic of event aggreation. If you got any ideas or thoughts on the subject, please get in touch by posting a comment below or sending a message to our mailing list.


Debezium 0.7.4 Is Released

It’s my pleasure to announce the release of Debezium 0.7.4!

Continuing the 0.7 release line, this new version brings several bug fixes and a handful of new features. We recommend this upgrade to all users. When upgrading from earlier versions, please check out the release notes of all versions between the one you’re currently on and 0.7.4 in order to learn about any steps potentially required for upgrading.

New features

In terms of new features, there’s a new mode for handling decimal columns in Postgres and MySQL (DBZ-611). By setting the decimal.handling.mode connector option to string, Debezium will emit decimal and numeric columns as Strings. That oftentimes is easier to handle for consumers than the byte-array based representation used by default, while keeping the full precision. As a bonus, string also allows to convey the special numeric values NaN and Infinity as supported by Postgres. Note that this functionality required an update to Debezium’s logical decoding plug-in which runs within the Postgres database server. This plug-in must be upgraded to the new version before upgrading the Debezium Postgres connector.

Speaking of byte arrays, the BYTEA column type in Postgres is now also supported (DBZ-605).

For the MySQL connector, there’s a new option to the snapshotting routine: snapshot.locking.mode (DBZ-602). By setting this to NONE, this option allows to skip any table locks during snapshotting. This should be used if and only if you’re absolutely sure that the tables don’t undergo structural changes (columns added, removed etc.) while the snapshot is taken. But if that’s guaranteed, the new mode can be a useful tool for increasing overall system performance, as writes by concurrent processes won’t be blocked. That’s especially useful on environments such as Amazon RDS, where the connector otherwise would be required to keep a lock for the entirety of the snapshot. The new option supersedes the existing snapshot.minimal.locks option. Please see the connector documentation for the details. This feature was contributed by our community member Stephen Powis; many thanks to you!

Bug Fixes

0.7.4 brings multiple fixes related to how numeric columns are handled. E.g. columns without scale couldn’t correctly be processed by the MySQL connector during binlog reading (DBZ-615). That’s fixed now. And when using the Postgres connector, arbitrary precision column values are correctly converted into change data message fields now (DBZ-351).

We also noticed a regression introduced in Debezium 0.6: the field schema for NUMERIC columns was always marked as optional, also if that column was actually declared as NOT NULL. The same affected geo-spatial array types on Postgres as supported as of Debezium 0.7. This has been fixed with DBZ-635. We don’t expect any impact on consumers by this change (just as before, they’ll always get a value for such field, only its schema won’t be incorrectly marked as optional any more).

Please see the full change log for more details and the complete list of issues fixed in Debezium 0.7.4.

What’s next?

Following our three weeks release cadence, the next Debezium release is planned for March 28th. We got some exciting changes in the works for that: if things go as planned, we’ll release the first version of our Oracle connector (DBZ-20). This will be based on the Oracle XStream API in the first iteration and not support snapshots yet. But we felt it’d make sense to roll out this connector incrementally, so to get out the new feature early on and collect feedback on it. We’ve also planned to explore alternatives to using the XStream API in future releases.

Another great new feature will be Reactive Streams support (DBZ-566). Based on top of the existing embedded mode, this will make it very easy to consume change data events using Reactive Streams implementations such as RxJava 2, the Java 9 Flow API and many more. It’ll also be very useful to consume change events in reactive frameworks such as Vert.x. We’re really looking forward to shipping this feature and already have a pending pull request for it. If you like, take a look and let us know about your feedback!

Please also check out our roadmap for the coming months of Debezium’s development. This is our current plan for the things we’ll work on, but it’s not cast in stone, so please tell us about your feature requests by sending a message to our Google group. We’re looking forward to your feedback!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium 0.7.3 Is Released

I’m very happy to announce the release of Debezium 0.7.3!

This is primarily a bugfix release, but we’ve also added a handful of smaller new features. It’s a recommended upgrade for all users. When upgrading from earlier versions, please check out the release notes of all versions between the one your’re currently on and 0.7.3 in order to learn about any steps potentially required for upgrading.

Let’s take a closer look at some of the new features.

All Connectors

Using the new connector option tombstones.on.delete you can now control whether upon record deletions a tombstone event should be emitted or not (DBZ-582). Doing so is usually the right thing and thus remains the default behaviour. But disabling tombstones may be desirable in certain situations, and this gets a bit easier now using that option (before you’d have to use an SMT - single message transform -, which for instance isn’t supported when using Debezium’s embedded mode). This feature was contributed by our community member Raf Liwoch. Thanks!

We’ve also spent some time on a few operational aspects: The sourceInfo element of Debezium’s change data messages contains a new field representing the version of the connector that created the message (DBZ-593). This lets message consumers take specific action based on the version. For instance this can be helpful where a new Debezium release fixes a bug, which consumers could work around so far. Now, after the update to that new Debezium version, that workaround should not be applied anymore. The version field will allow consumers to decide whether to apply the workaround or not.

The names of all the threads managed by Debezium are now structured in the form of "debezium-<connector>-…​" (DBZ-587). This helps with identifying Debezium’s threads when analyzing thread dumps for instance.

Postgres Connector

Here we’ve focused on improving the support for array types: besides fixing a bug related to numeric arrays (DBZ-577) we’ve also completed the support for the PostGIS types (which was introduced in 0.7.2), allowing you to capture array columns of types GEOMETRY and GEOGRAPHY.

Snapshots are now correctly interruptable (DBZ-586) and the connector will correctly handle the case where after a restart it should continue from a WAL position which isn’t available any more: it’ll stop, requiring you to do a new snapshot (DBZ-590).

MySQL Connector

The MySQL connector can create the DB history topic automatically, if needed (DBZ-278). This means you don’t have to create that topic yourself and you also don’t need to rely on Kafka’s automatic topic creation any longer (any change data topics will automatically be created by Kafka Connect).

Also the connector can optionally emit messages to a dedicated heartbeat topic in a configurable interval (DBZ-220). This comes in handy in situations where you only want to capture tables with low traffic, while other tables in the database are changed more frequently. In that case, no messages would have been emitted to Kafka Connect for a long time, and thus no offset would have been committed either. This could have caused trouble when restarting the connector: it wanted to resume from the last comitted offset, which may not be available in the binlogs any longer. But as the captured tables didn’t change, it actually wouldn’t be necessary to resume from such old binlog position. This all is avoided by emitting messages to the heartbeat topic regularly, which causes the last offset the connector has seen to be committed.

We’ll roll out this change to the other connectors, too, in future releases.

What’s next?

Please see the full change log for more details and the complete list of issues fixed in Debezium 0.7.3.

The next release is scheduled for March 7th. We’ll still have to decide whether that will be 0.7.4 or 0.8.0, depending on how far we are by then with our work on the Oracle connector (DBZ-137).

Please also our roadmap describing our ideas for future development of Debezium. This is our current thinking of the things we’d like to tackle in the coming months, but it’s not cast in stone, so please let us know about your feature requests by sending a message to our Google group. We’re looking forward to your feedback!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


back to top