Today, I am pleased to announce the second alpha release in the 2.2 release stream, Debezium 2.2.0.Alpha2. This release includes a plethora of bug fixes, improvements, breaking changes, and a number of new features including, but not limited to, a new ExtractRecordChanges single message transformation, a Reactive-based implementation of the Debezium Outbox extension for Quarkus, a Debezium Storage module for Apache RocketMQ, and much more. Let’s take moment and dive into these new features, improvements, and breaking changes.

Breaking Changes

We typically try to avoid any breaking changes, even during minor releases such as this; however, sometimes breaking changes are inevitable given the circumstances. Debezium 2.2.0.Alpha2 includes three breaking changes:

Topic / Schema naming changes

Debezium previously sanitized topic and schema names by using an underscore (_) to replace non-ASCII characters that would lead to unsupported topic or schema names when using schema registries. However, if this non-ASCII character was the only difference between two similar topics or schema names that otherwise only varied by case, this would lead to other problems.

In order to address this in the most compatible way, Debezium now uses a strategy-based approach to map characters uniquely. As a side effect of this change, the sanitize.field.names configuration property has been retired and replaced by this new strategy-based approach.

Each connector supports two configuration properties to control this behavior:

schema.name.adjustment.mode

Specifies how schema names should be adjusted for compatibility with the message converter.

field.name.adjustment.mode

Specifies how field names should be adjusted for compatibility with the message converter.

These two connector configuration properties support three modes:

none

No adjustment is made to the schema or field names, passed as-is.

avro

Replaces characters that cannot be used in Avro with an underscore (_).

avro_unicode

Replaces underscores (_) and characters that cannot be used in Avro with unicode-based characters.

This now allows you to pick the most appropriate strategy based on your table or collection naming convention.

Source info block changes with Oracle connector

All Debezium change events related to inserts, updates, and deletes contain a source info block in the event’s payload. For the Oracle connector, this block contains a special field called ssn that represents the SQL sequence number for this change.

It has been identified that there were corner cases where the value sourced from the database for this field could exceed the maximum value of 2,147,483,647, or the maximum value of an INT32 data type. To fix this corner case, we’ve changed the data type from INT32 to INT64, which allows up to a maximum value of 9,223,372,036,854,775,807.

This change should be entirely non-invasive, but we wanted to bring attention to this should you have pipelines that could be storing this value in a sink system or if you are using a schema registry.

Debezium Server moved to new repository

Debezium Server is a standalone Quarkus-based runtime for Debezium source connectors enabling the integration with various platforms like EventHubs, PubSub, Pulsar, Redis, and Kafka, to name a few. With this release, we have moved the code related to Debezium Server to its own GitHub repository.

This change was required in order to support building Debezium Server to include connectors that are not part of the main Debezium repository, connectors such as Db2, Google Spanner, Cassandra 4, and Vitess. Therefore, this means that starting with this release, Debezium Server now ships with all connectors (excluding Cassandra 3) by default.

Cassandra 3 is excluded due to some technical limitations with class loading that creates conflicts with Cassandra 4. We are aware of this and plan to deliver a solution to include Cassandra 3 in the future.

New ExtractChangedRecordState SMT

We have heard from the community on several occasions that it would great to have an out-of-the-box way to determine what values have changed in a Debezium change event. The new single message transform (SMT) ExtractChangedRecordState aims to deliver on this request by adding metadata to the event identifying which fields changed or were unchanged.

In order to get started with this new transformation, configure it as part of your connector configuration:

transforms=changes
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.changes.header.changed=ChangedFields
transforms.changes.header.unchanged=UnchangedFields

This transformation can be configured to disclose either what fields changed by setting header.changed, what fields are unchanged by setting header.unchanged, or both by setting both properties as shown above. The transformation will add a new header with the specified name and it’s value will include a collection of field names based on whether you’ve configured changes, non-changes, or both.

Drop fields using ExtractNewRecordState SMT

The ExtractNewRecordState single message transformation is extremely useful in situations where you need to consume the Debezium change event in a flattened format. This SMT has been changed in this release to add the ability to drop fields from the payload and the message key of the event.

This new feature introduces three new configuration properties for the transformation:

drop.fields.header.name

The Kafka message header name to use for listing field names in the source message that are to be dropped.

drop.fields.from.key

Specifies whether to remove fields also from the key, defaults to false.

drop.fields.keep.schema.compatible

Specifies whether to remove fields that are only optional, defaults to true.

When using Avro, schema compatibility is extremely important. This is why we opted to enforce schema compatibility by default. If a field is configured to be dropped but it is non-optional, the field will not be removed from the key nor the payload unless schema compatibility is disabled.

These new configuration options allow for some exciting ways to manipulate change events. For example, to emit events with only changed fields, pairing the ExtractNewRecordState with the new ExtractChangedRecordState transformation makes this extremely simple and straightforward. An example configuration to only emit changed columns would look like the following:

transforms=changes,extract
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.changes.header.unchanged=UnchangedFields
transforms.extract.type=io.debezium.transforms.ExtractNewRecordState
transforms.extract.drop.fields.header.name=UnchangedFields

The above configuration will explicitly not include unchanged fields from the event’s payload value. If a field in the key did not change, it will be unaffected because drop.fields.from.key was left as its default of false. And finally, if a field in the event’s payload is to be dropped because it did not change, but it’s not optional, it will continue to be included in the transformation’s output event to comply with schema compatibility.

Reactive Debezium Outbox Quarkus Extension

The outbox pattern is an approach that many microservices leverage to share data across microservice boundaries. We introduced the Debezium Outbox Quarkus Extension in Debezium 1.1 back in early 2020, and it has allowed Quarkus users to leverage the outbox pattern with ease using Debezium.

Thanks to Ingmar Fjolla, Debezium 2.2.0.Alpha2 includes a new reactive-based implementation of the Debezium Outbox Quarkus Extension. This new implementation is based on Vert.x and Hibernate Reactive, providing a fully asynchronous solution to the outbox pattern using Debezium.

This new extension will be included in the Quarkus Platform releases latter this quarter or early Q2, however, if you want to get started with it today, you can easily drop it directly into your project’s configuration using the following coordinates:

Maven coordinates
<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-quarkus-outbox-reactive</artifactId>
  <version>2.2.0.Alpha2</version>
</dependency>
Gradle coordinates
io.debezium:debezium-quarkus-outbox-reactive:2.2.0.Alpha2

New Rocket MQ Schema History Storage

Debezium’s new storage API has been a huge success over this past year. We initially started with our original file and Kafka based implementations for offset and schema history storage, but that has since grown to support storing schema history on other platforms such as Amazon S3 and Redis.

This release continues to expand on this by adding a new schema history storage implementation for Rocket MQ. In order to get started with storing your schema history into Rocket MQ, the debezium-storage-rocketmq dependency must first be on the classpath and accessible by the connector runtime.

Once the dependency exists, the only remaining step will be configuring the schema history connector configuration. The following example shows basic usage of the Rocket MQ schema history:

schema.history.internal.rocketmq.topic=schema-history
schema.history.internal.rocketmq.name.srv.addr=172.17.15.2
schema.history.internal.rocketmq.acl.enabled=true
schema.history.internal.rocketmq.access.key=<rocketmq-access-key>
schema.history.internal.rocketmq.secret.key=<rocketmq-secret-key>
schema.history.internal.rocketmq.recovery.attempts=5
schema.history.internal.rocketmq.recovery.poll.interval.ms=1000
schema.history.internal.rocketmq.store.record.timeout.ms=2000
schema.history.internal.rocketmq.topic

Specifies the topic name where the schema history will be stored.

schema.history.internal.rocketmq.name.srv.addr

Specifies the service discovery service nameserver for Rocket MQ.

schema.history.internal.rocketmq.acl.enabled

Specifies whether access control lists (ACLs) are enabled, defaults to false.

schema.history.internal.rocketmq.access.key

Specifies the Rocket MQ access key, required only if ACLs are enabled.

schema.history.internal.rocketmq.secret.key

Specifies the Rocket MQ secret key, required only if ACLs are enabled.

schema.history.internal.rocketmq.recovery.attempts

Specifies the number of sequential attempts that no data is returned before recovery completes.

schema.history.internal.rocketmq.recovery.poll.interval.ms

Specifies the number of milliseconds for each poll attempt to recover the history.

schema.history.internal.rocketmq.store.record.timeout.ms

Specifies the number of milliseconds for a write to Rocket MQ to complete before timing out.

Other fixes

There were quite a number of other improvements, bug fixes, and stability changes in this release, some noteworthy are:

  • Better control on debezium GTID usage DBZ-2296

  • Data type conversion failed for mysql bigint DBZ-5798

  • ActivateTracingSpan wrong timestamps reported DBZ-5827

  • Unable to specify column or table include list if name contains a backslash \ DBZ-5917

  • debezium-connector-cassandra 2.1.0.Alpha2 plugin can no longer run "out of the box" DBZ-5925

  • MongoDB Incremental Snapshot not Working DBZ-5973

  • Nullable columns marked with "optional: false" in DDL events DBZ-6003

  • Upgrade to Quarkus 2.16.0.Final DBZ-6005

  • Vitess: Handle the shard list difference between current db shards and persisted shards DBZ-6011

  • Offsets are not flushed on connect offsets topic when encountering an error on Postgres connector DBZ-6026

  • Unexpected format for TIME column: 8:00 DBZ-6029

  • Oracle does not support compression/logging clauses after an LOB storage clause DBZ-6031

  • debezium-server Pulsar support non-default tenant and namespace DBZ-6033

  • Debezium is logging the full message along with the error DBZ-6037

  • Improve resilience during internal schema history recovery from Kafka DBZ-6039

  • Vitess: Support Mapping unsigned bigint mysql column type to long DBZ-6043

  • Incremental snapshot sends the events from signalling DB to Kafka DBZ-6051

  • Upgrade Kafka to 3.3.2 DBZ-6054

  • Mask password in log statement DBZ-6064

  • Loading Custom offset storage fails with Class not found error DBZ-6075

  • Increase query.fetch.size default to something sensible above zero DBZ-6079

  • SQL Server tasks fail if the number of databases is smaller than maxTasks DBZ-6084

  • When using LOB support, an UPDATE against multiple rows can lead to inconsistent event data DBZ-6107

  • Expose sequence field in CloudEvents message id DBZ-6089

  • Reduce verbosity of skipped transactions if transaction has no events relevant to captured tables DBZ-6094

  • Upgrade Kafka client to 3.4.0 DBZ-6102

What’s Next?

We’re still very early in the development cycle of Debezium 2.2 and many other features are still in development, including:

  • Configurable signal channels, enabling users to send signals not only from a database table or a Kafka topic, but also from other means such as an HTTP endpoint, the file system, etc.

  • The Debezium JDBC sink connector that supports native Debezium change events out-of-the-box, without requiring the use of the Event Flattening transformation.

  • And a plethora of Debezium UI enhancements

We are about middle way through the quarter and Debezium 2.2 will begin to enter beta phase very soon. We would love to hear your feedback or suggestions regarding the roadmap, changes in this release, those that are outstanding, or anything we haven’t mentioned. Be sure to get in touch with us on the mailing list or our chat if there is.

Also be on the lookout for our first installment of our 2023 Newsletter as well as the upcoming and conclusion to the blog series, "Debezium for Oracle" where I cover performance, debugging, and frequently asked questions about the Oracle connector.

Until next time…​

Chris Cranford

Chris is a software engineer at Red Hat. He previously was a member of the Hibernate ORM team and now works on Debezium. He lives in North Carolina just a few hours from Red Hat towers.

   


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.