Debezium Extensions for Quarkus
The following documentation explores how your Quarkus application can integrate Debezium.
Introduction
Debezium Extensions for Quarkus integrates the Debezium Runtime into Quarkus applications, enabling developers to consume change data capture (CDC) events from supported databases directly within a lightweight, cloud-native application.
When use the extension
-
CDC in Quarkus: Seamlessly embeds Debezium into Quarkus apps, ideal for microservices use cases.
-
Kafka-free Option: Avoids the need for full Kafka infrastructure when only simple streaming or in-process processing is required
-
Lightweight & Fast: Leverages native Quarkus' fast startup time and low memory
-
Developer Friendly: Simplifies setup with Quarkus-native configuration and dependency management
Features
The Extension supports well-established Debezium features and much more! Here a list of new or revisited features:
-
Debezium Capturing Listener
-
Debezium Custom Deserializer
-
Debezium Lifecycle events
-
Debezium Heartbeat events
-
Debezium Custom Data Type Converter
-
Debezium Notification events
-
Debezium Post Processors
Prerequisites
-
JDK 21+ installed with
JAVA_HOME
configured appropriately -
Apache maven 3.9.9
-
Quarkus version 3.25.0
-
Docker or Podman
-
Optionally Mandrel or GraalVM installed and configured appropriately if you want to build a native executable
Working Example
A fully functional example demonstrating the extension capabilities is available in the debezium-example repository. It can serve as a reference for setting up and testing your own Quarkus Application.
Bootstrap a Quarkus Project
This section can be skipped if you already have in place a quarkus application.
The easiest way to create a new Quarkus Project is to open a terminal and run the following command:
mvn io.quarkus.platform:quarkus-maven-plugin:3.25.0:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=getting-started
it generates the following in ./getting-started
:
-
the quarkus boilerplate
-
the application configuration
Add the Extension to the project
In the project, add the Debezium extension dependency to your application’s pom.xml
:
<dependencies>
<!-- [...] -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>quarkus-debezium-postgres</artifactId>
<version>${version.debezium}</version>
</dependency>
<!-- [...] -->
</dependencies>
Configuring Debezium Connector
The Debezium extension is shipped with a Connector that listen CDC events from the datasource but a minimal configuration is mandatory.
The Extension use Quarkus (the SmallRye Config API) to provide all mechanisms related with configuration.
Debezium configurations use the prefix quarkus.debezium.*
and if we will use an application configuration file located in src/main/resources/application.properties
, the result for a minimum setup is the following:
# Debezium CDC configuration
quarkus.debezium.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
quarkus.debezium.name=native
quarkus.debezium.topic.prefix=native
quarkus.debezium.plugin.name=pgoutput
quarkus.debezium.snapshot.mode=initial
# datasource configuration
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=<your username>
quarkus.datasource.password=<your password>
quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/hibernate_orm_test
quarkus.datasource.jdbc.max-size=16
The configuration parameters available are in the Debezium documentation. Additionally, you must specify the datasource configuration parameters as required by Debezium runtime.
Capturing events from Debezium
Continuing from the previous minimal configuration, your Quarkus application can receive CDC event payload directly:
import io.debezium.runtime.CapturingEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.runtime.Capturing;
@ApplicationScoped
public class ProductHandler {
@Capturing
public void capture(CapturingEvent<SourceRecord> record) {
// process your events
}
}
The CapturingEvent<T>
contains information related to the kind of database operation:
@Capturing
public void capture(CapturingEvent<SourceRecord> record) {
switch (record) {
case Create<SourceRecord> event -> {}
case Delete<SourceRecord> event -> {}
case Message<SourceRecord> event -> {}
case Read<SourceRecord> event -> {}
case Truncate<SourceRecord> event -> {}
case Update<SourceRecord> event -> {}
}
}
Filtering Change Data Capture Events
It’s possible to filter events by destination
:
@Capturing(destination = "native.inventory.products")
public void capture(CapturingEvent<SourceRecord> record) {
// process your event
}
The default behavior is that a Debezium connector destination
is formed from the name of the prefix
defined in the configuration with the database name and the name of the table in which the change was made.
In some cases the destination
is redefined using an SMT.
Deserializer via Jackson
Quarkus has built-in support for JSON serialization and deserialization based on Jackson.
There is an existing ObjectMapperDeserializer
that can be used to deserialize all data objects via Jackson.
The corresponding deserializer class needs to be subclassed.
So, we have to create a ProductDeserializer
that extends the ObjectMapperDeserializer
.
public class ProductDeserializer extends ObjectMapperDeserializer<Product> {
public ProductDeserializer() {
super(Product.class);
}
}
Finally, configure your capture channel to use the Jackson deserializer for a particular destination:
quarkus.debezium.capturing.products.destination=native.inventory.products
quarkus.debezium.capturing.products.deserializer=com.acme.product.jackson.ProductDeserializer
and use it in your code adding the destination assigned to the deserializer:
import io.debezium.runtime.CapturingEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.runtime.Capturing;
@ApplicationScoped
public class ProductHandler {
@Capturing(destination = "native.inventory.products")
public void capture(CapturingEvent<Product> record) {
// process your events
}
}
or only the deserialized object without CapturingEvent<T>
:
keep in mind that in such case, you don’t have information related to the database operation |
import io.debezium.runtime.CapturingEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.runtime.Capturing;
@ApplicationScoped
public class ProductHandler {
@Capturing(destination = "native.inventory.products")
public void capture(Product product) {
// process your events
}
}
Lifecycle events
it’s possible to get information related to the status of debezium listening lifecycle events:
import io.debezium.runtime.events.*;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@ApplicationScoped
public class LifecycleListener {
public void started(@Observes ConnectorStartedEvent event) {
// your logic
}
public void stopped(@Observes ConnectorStoppedEvent connectorStoppedEvent) {
// your logic
}
public void tasksStarted(@Observes TasksStartedEvent tasksStartedEvent) {
// your logic
}
public void tasksStopped(@Observes TasksStoppedEvent tasksStoppedEvent) {
// your logic
}
public void pollingStarted(@Observes PollingStartedEvent pollingStartedEvent) {
// your logic
}
public void pollingStopped(@Observes PollingStoppedEvent pollingStoppedEvent) {
// your logic
}
public void completed(@Observes DebeziumCompletionEvent debeziumCompletionEvent) {
// your logic
}
}
the following events are available:
-
ConnectorStartedEvent
is fired when the Debezium starts a connector -
ConnectorStoppedEvent
is fired when Debezium stops a connector -
TasksStartedEvent
is fired when a connector task is started -
TasksStoppedEvent
is fired when the connector task is stopped -
PollingStartedEvent
is fired when the Debezium engine begins polling for connector changes -
PollingStoppedEvent
is fired when Debezium engine stops polling the connector for changes -
DebeziumCompletionEvent
is fired after the Debezium engine completes it’s shutdown. It includes all the information about whether the prior execution was successful or if it failed, the reason and error why
Heartbeat events
it’s possible to listen heartbeat events in your quarkus application:
import io.debezium.runtime.events.DebeziumHeartbeat;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@ApplicationScoped
public class HeartbeatListener {
public void heartbeat(@Observes DebeziumHeartbeat heartbeat) {
//
}
}
The DebeziumHeartbeat
contains information related to:
-
Connector
-
Debezium status
-
partition
-
offset
Notification events
Debezium notifications provide events about fine grain status (snapshot
and streaming
) always available as Jakarta event:
import io.quarkus.debezium.notification.SnapshotEvent;
import io.quarkus.debezium.notification.DebeziumNotification;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@ApplicationScoped
public class NotificationListener {
public void snapshot(@Observes SnapshotEvent event) {
//
}
public void notification(@Observes DebeziumNotification event) {
//
}
}
the following events are available:
-
DebeziumNotification
-
SnapshotStarted
-
SnapshotInProgres
-
SnapshotTableScanCompleted
-
SnapshotAborted
-
SnapshotSkipped
-
SnapshotCompleted
-
SnapshotPaused
-
SnapshotResumed
Data Type Converter
It’s possible to define a Debezium Custom Converter in the extension using the @CustomConverter
annotation and instantiate a ConverterDefinition
that defines the type conversation:
import io.debezium.relational.CustomConverterRegistry.ConverterDefinition;
import io.debezium.runtime.CustomConverter;
import io.debezium.spi.converter.ConvertedField;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.data.SchemaBuilder;
@ApplicationScoped
public class StringConverter {
@CustomConverter
public ConverterDefinition<SchemaBuilder> bind(ConvertedField field) {
return new ConverterDefinition<>(SchemaBuilder.string(), String::valueOf);
}
}
this kind of conversation is applied to all the fields that are in cdc events.
To apply the conversion only to a subset of fields, it’s possible to enrich the CustomConverter
with a FieldFilterStrategy
that filters only the interested fields:
@CustomConverter(filter = CustomFieldFilterStrategy.class)
public ConverterDefinition<SchemaBuilder> filteredBind(ConvertedField field) {
return new ConverterDefinition<>(SchemaBuilder.string(), String::valueOf);
}
@ApplicationScoped
public static class CustomFieldFilterStrategy implements FieldFilterStrategy {
@Override
public boolean filter(ConvertedField field) {
// your logic
return false;
}
}
Post Processors
Post processors apply lightweight, per-message changes earlier in the event flow than SMTs, allowing them to modify messages within Debezium’s context.
This makes them more efficient than transformations. It’s possible to define a post processor in two ways: as configuration parameter or using the annotation @PostProcessing
.
For configuration, the official documentation outlines the available parameters, such as those for the Reselect
post-processor:
quarkus.debezium.post.processors=reselector
quarkus.debezium.post.processors.reselector.type=io.debezium.processors.reselect.ReselectColumnsPostProcessor
quarkus.debezium.post.processors.reselector.reselect.unavailable.values=true
quarkus.debezium.post.processors.reselector.reselect.null.values=true
quarkus.debezium.post.processors.reselector.reselect.use.event.key=false
quarkus.debezium.post.processors.reselector.reselect.error.handling.mode=WARN
For the code, in the extension is available the annotation @PostProcessing
that gives access to the key
and the Struct
:
import io.debezium.runtime.PostProcessing;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.data.Struct;
@ApplicationScoped
public class PostProcessorHandler {
@PostProcessing
public void processing(Object key, Struct struct) {
// apply your logic
}
}
DevService Support
Quarkus automatically provisions unconfigured services in dev and test modes using Dev Services. When an extension is included without configuration, Quarkus starts the necessary service (via Testcontainers) and connects it to your app. For Debezium, it’s necessary a setup that default images of Quarkus doesn’t support. The Extension is already equipped with a dev service with an image configured for change data capture but the support is experimental and in case of errors or issue you can disable it with the following properties
quarkus.datasource.devservices.enabled=false
or override using an official Debezium image
quarkus.datasource.devservices.image-name=quay.io/debezium/postgres:15