You are viewing documentation for the current development version of Debezium.
If you want to view the latest stable version of this page, please go here.

Debezium Extensions for Quarkus

Introduction

Debezium Extensions for Quarkus integrates the Debezium Runtime into Quarkus applications, enabling developers to consume change data capture (CDC) events from supported databases directly within a lightweight, cloud-native application.

When use the extension

  • CDC in Quarkus: Seamlessly embeds Debezium into Quarkus apps, ideal for microservices use cases.

  • Kafka-free Option: Avoids the need for full Kafka infrastructure when only simple streaming or in-process processing is required

  • Lightweight & Fast: Leverages native Quarkus' fast startup time and low memory

  • Developer Friendly: Simplifies setup with Quarkus-native configuration and dependency management

Features

The Extension supports well-established Debezium features and much more! Here a list of new or revisited features:

  • Debezium Capturing Listener

  • Debezium Custom Deserializer

  • Debezium Lifecycle events

  • Debezium Heartbeat events

  • Debezium Custom Data Type Converter

  • Debezium Notification events

  • Debezium Post Processors

Supported Connectors

The Postgres connector is the only supported connector at this time.

Prerequisites

  • JDK 21+ installed with JAVA_HOME configured appropriately

  • Apache maven 3.9.9

  • Quarkus version 3.25.0

  • Docker or Podman

  • Optionally Mandrel or GraalVM installed and configured appropriately if you want to build a native executable

Working Example

A fully functional example demonstrating the extension capabilities is available in the debezium-example repository. It can serve as a reference for setting up and testing your own Quarkus Application.

Bootstrap a Quarkus Project

This section can be skipped if you already have in place a quarkus application.

The easiest way to create a new Quarkus Project is to open a terminal and run the following command:

mvn io.quarkus.platform:quarkus-maven-plugin:3.25.0:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=getting-started

it generates the following in ./getting-started:

  • the quarkus boilerplate

  • the application configuration

Add the Extension to the project

In the project, add the Debezium extension dependency to your application’s pom.xml:

    <dependencies>
        <!-- [...] -->
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>quarkus-debezium-postgres</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <!-- [...] -->
    </dependencies>

Configuring Debezium Connector

The Debezium extension is shipped with a Connector that listen CDC events from the datasource but a minimal configuration is mandatory.

The Extension use Quarkus (the SmallRye Config API) to provide all mechanisms related with configuration. Debezium configurations use the prefix quarkus.debezium.* and if we will use an application configuration file located in src/main/resources/application.properties, the result for a minimum setup is the following:

# Debezium CDC configuration
quarkus.debezium.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
quarkus.debezium.name=native
quarkus.debezium.topic.prefix=native
quarkus.debezium.plugin.name=pgoutput
quarkus.debezium.snapshot.mode=initial

# datasource configuration
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=<your username>
quarkus.datasource.password=<your password>
quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/hibernate_orm_test
quarkus.datasource.jdbc.max-size=16

The configuration parameters available are in the Debezium documentation. Additionally, you must specify the datasource configuration parameters as required by Debezium runtime.

Capturing events from Debezium

Continuing from the previous minimal configuration, your Quarkus application can receive CDC event payload directly:

import io.debezium.runtime.CapturingEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.runtime.Capturing;

@ApplicationScoped
public class ProductHandler {


    @Capturing
    public void capture(CapturingEvent<SourceRecord> record) {
        // process your events
    }

}

The CapturingEvent<T> contains information related to the kind of database operation:

    @Capturing
    public void capture(CapturingEvent<SourceRecord> record) {
        switch (record) {
            case Create<SourceRecord> event -> {}
            case Delete<SourceRecord> event -> {}
            case Message<SourceRecord> event -> {}
            case Read<SourceRecord> event -> {}
            case Truncate<SourceRecord> event -> {}
            case Update<SourceRecord> event -> {}
        }
    }

Filtering Change Data Capture Events

It’s possible to filter events by destination:

    @Capturing(destination = "native.inventory.products")
    public void capture(CapturingEvent<SourceRecord> record) {
        // process your event
    }

The default behavior is that a Debezium connector destination is formed from the name of the prefix defined in the configuration with the database name and the name of the table in which the change was made. In some cases the destination is redefined using an SMT.

Deserializer via Jackson

Quarkus has built-in support for JSON serialization and deserialization based on Jackson. There is an existing ObjectMapperDeserializer that can be used to deserialize all data objects via Jackson.

The corresponding deserializer class needs to be subclassed. So, we have to create a ProductDeserializer that extends the ObjectMapperDeserializer.

public class ProductDeserializer extends ObjectMapperDeserializer<Product> {
    public ProductDeserializer() {
        super(Product.class);
    }
}

Finally, configure your capture channel to use the Jackson deserializer for a particular destination:

quarkus.debezium.capturing.products.destination=native.inventory.products
quarkus.debezium.capturing.products.deserializer=com.acme.product.jackson.ProductDeserializer

and use it in your code adding the destination assigned to the deserializer:

import io.debezium.runtime.CapturingEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.runtime.Capturing;

@ApplicationScoped
public class ProductHandler {


    @Capturing(destination = "native.inventory.products")
    public void capture(CapturingEvent<Product> record) {
        // process your events
    }

}

or only the deserialized object without CapturingEvent<T>:

keep in mind that in such case, you don’t have information related to the database operation

import io.debezium.runtime.CapturingEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.runtime.Capturing;

@ApplicationScoped
public class ProductHandler {


    @Capturing(destination = "native.inventory.products")
    public void capture(Product product) {
        // process your events
    }

}

Lifecycle events

it’s possible to get information related to the status of debezium listening lifecycle events:

import io.debezium.runtime.events.*;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@ApplicationScoped
public class LifecycleListener {

    public void started(@Observes ConnectorStartedEvent event) {
        // your logic
    }

    public void stopped(@Observes ConnectorStoppedEvent connectorStoppedEvent) {
        // your logic
    }
    public void tasksStarted(@Observes TasksStartedEvent tasksStartedEvent) {
        // your logic
    }
    public void tasksStopped(@Observes TasksStoppedEvent tasksStoppedEvent) {
        // your logic
    }
    public void pollingStarted(@Observes PollingStartedEvent pollingStartedEvent) {
        // your logic
    }
    public void pollingStopped(@Observes PollingStoppedEvent pollingStoppedEvent) {
        // your logic
    }
    public void completed(@Observes DebeziumCompletionEvent debeziumCompletionEvent) {
        // your logic
    }

}

the following events are available:

  • ConnectorStartedEvent is fired when the Debezium starts a connector

  • ConnectorStoppedEvent is fired when Debezium stops a connector

  • TasksStartedEvent is fired when a connector task is started

  • TasksStoppedEvent is fired when the connector task is stopped

  • PollingStartedEvent is fired when the Debezium engine begins polling for connector changes

  • PollingStoppedEvent is fired when Debezium engine stops polling the connector for changes

  • DebeziumCompletionEvent is fired after the Debezium engine completes it’s shutdown. It includes all the information about whether the prior execution was successful or if it failed, the reason and error why

Heartbeat events

it’s possible to listen heartbeat events in your quarkus application:

import io.debezium.runtime.events.DebeziumHeartbeat;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@ApplicationScoped
public class HeartbeatListener {

    public void heartbeat(@Observes DebeziumHeartbeat heartbeat) {
        //
    }
}

The DebeziumHeartbeat contains information related to:

  • Connector

  • Debezium status

  • partition

  • offset

Notification events

Debezium notifications provide events about fine grain status (snapshot and streaming) always available as Jakarta event:

import io.quarkus.debezium.notification.SnapshotEvent;
import io.quarkus.debezium.notification.DebeziumNotification;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@ApplicationScoped
public class NotificationListener {

    public void snapshot(@Observes SnapshotEvent event) {
        //
    }

    public void notification(@Observes DebeziumNotification event) {
        //
    }
}

the following events are available:

  • DebeziumNotification

  • SnapshotStarted

  • SnapshotInProgres

  • SnapshotTableScanCompleted

  • SnapshotAborted

  • SnapshotSkipped

  • SnapshotCompleted

  • SnapshotPaused

  • SnapshotResumed

Data Type Converter

It’s possible to define a Debezium Custom Converter in the extension using the @CustomConverter annotation and instantiate a ConverterDefinition that defines the type conversation:

import io.debezium.relational.CustomConverterRegistry.ConverterDefinition;
import io.debezium.runtime.CustomConverter;
import io.debezium.spi.converter.ConvertedField;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.data.SchemaBuilder;

@ApplicationScoped
public class StringConverter {

    @CustomConverter
    public ConverterDefinition<SchemaBuilder> bind(ConvertedField field) {
        return new ConverterDefinition<>(SchemaBuilder.string(), String::valueOf);
    }
}

this kind of conversation is applied to all the fields that are in cdc events. To apply the conversion only to a subset of fields, it’s possible to enrich the CustomConverter with a FieldFilterStrategy that filters only the interested fields:

    @CustomConverter(filter = CustomFieldFilterStrategy.class)
    public ConverterDefinition<SchemaBuilder> filteredBind(ConvertedField field) {
        return new ConverterDefinition<>(SchemaBuilder.string(), String::valueOf);
    }

    @ApplicationScoped
    public static class CustomFieldFilterStrategy implements FieldFilterStrategy {

        @Override
        public boolean filter(ConvertedField field) {
            // your logic
            return false;
        }

    }

Post Processors

Post processors apply lightweight, per-message changes earlier in the event flow than SMTs, allowing them to modify messages within Debezium’s context. This makes them more efficient than transformations. It’s possible to define a post processor in two ways: as configuration parameter or using the annotation @PostProcessing.

For configuration, the official documentation outlines the available parameters, such as those for the Reselect post-processor:

quarkus.debezium.post.processors=reselector
quarkus.debezium.post.processors.reselector.type=io.debezium.processors.reselect.ReselectColumnsPostProcessor
quarkus.debezium.post.processors.reselector.reselect.unavailable.values=true
quarkus.debezium.post.processors.reselector.reselect.null.values=true
quarkus.debezium.post.processors.reselector.reselect.use.event.key=false
quarkus.debezium.post.processors.reselector.reselect.error.handling.mode=WARN

For the code, in the extension is available the annotation @PostProcessing that gives access to the key and the Struct:

import io.debezium.runtime.PostProcessing;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.data.Struct;

@ApplicationScoped
public class PostProcessorHandler {

    @PostProcessing
    public void processing(Object key, Struct struct) {
        // apply your logic
    }
}

DevService Support

Quarkus automatically provisions unconfigured services in dev and test modes using Dev Services. When an extension is included without configuration, Quarkus starts the necessary service (via Testcontainers) and connects it to your app. For Debezium, it’s necessary a setup that default images of Quarkus doesn’t support. The Extension is already equipped with a dev service with an image configured for change data capture but the support is experimental and in case of errors or issue you can disable it with the following properties

quarkus.datasource.devservices.enabled=false

or override using an official Debezium image

quarkus.datasource.devservices.image-name=quay.io/debezium/postgres:15