Debezium Server

Please let us know if you encounter any problems while using this feature. Also please reach out if you have requirements for specific sinks to be supported by Debezium Server or even would be interested in contributing the required implementation.

Debezium provides a ready-to-use application that streams change events from a source database to messaging infrastructure like Amazon Kinesis, Google Cloud Pub/Sub, Apache Pulsar, Redis (Stream), or NATS JetStream. For streaming change events to Apache Kafka, it is recommended to deploy the Debezium connectors via Kafka Connect.

Installation

To install the server download and unpack the server distribution archive:

A directory named debezium-server will be created with these contents:

debezium-server/
|-- CHANGELOG.md
|-- config
|-- CONTRIBUTE.md
|-- COPYRIGHT.txt
|-- debezium-server-3.3.1.Final-runner.jar
|-- lib
|-- LICENSE-3rd-PARTIES.txt
|-- LICENSE.txt
|-- README.md
`-- run.sh

The server is started using run.sh script, dependencies are stored in the lib directory, and the directory config contains configuration files.

In case of using the Oracle connector you will have to add to the lib directory the ORACLE JDBC driver (if using XStream also the XStream API files), explained here: Obtaining the Oracle JDBC driver and XStream API files

Configuration

Debezium Server uses MicroProfile Configuration for configuration. This means that the application can be configured from disparate sources like configuration files, environment variables, system properties etc.

The main configuration file is config/application.properties. There are multiple sections configured:

  • debezium.source is for source connector configuration; each instance of Debezium Server runs exactly one connector

  • debezium.sink is for the sink system configuration

  • debezium.format is for the output serialization format configuration

  • debezium.transforms is for the configuration of message transformations

  • debezium.predicates is for the configuration of message transformation predicates

An example configuration file can look like so:

debezium.sink.type=kinesis
debezium.sink.kinesis.region=eu-central-1
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=inventory

In this configuration file example:

  • The sink is setup for AWS Kinesis in region eu-central-1

  • The source connector is setup for PostgreSQL using the default Debezium decoderbufs plugin. If using PostgreSQL’s built-in pgoutput plugin, set debezium.source.plugin.name=pgoutput

  • The source connector is set to capture events from a schema named inventory. If you want to capture all changes in the database, remove this line. Otherwise, update this line to correspond to your preferred schema or tables.

  • The source offset will be stored in a file named offsets.dat in the data directory. Note that you might need to create this directory to prevent an error on startup.

When the server is started it generates a seqeunce of log messages like this:

__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-05-15 11:33:12,189 INFO  [io.deb.ser.kin.KinesisChangeConsumer] (main) Using 'io.debezium.server.kinesis.KinesisChangeConsumer$$Lambda$119/0x0000000840130c40@f58853c' stream name mapper
2020-05-15 11:33:12,628 INFO  [io.deb.ser.kin.KinesisChangeConsumer] (main) Using default KinesisClient 'software.amazon.awssdk.services.kinesis.DefaultKinesisClient@d1f74b8'
2020-05-15 11:33:12,628 INFO  [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.kinesis.KinesisChangeConsumer' instantiated
2020-05-15 11:33:12,754 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
	converter.type = key
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = true

2020-05-15 11:33:12,757 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
	converter.type = value
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = false

2020-05-15 11:33:12,763 INFO  [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values:
	access.control.allow.methods =
	access.control.allow.origin =
	admin.listeners = null
	bootstrap.servers = [localhost:9092]
	client.dns.lookup = default
	config.providers = []
	connector.client.config.override.policy = None
	header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
	internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
	internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
	key.converter = class org.apache.kafka.connect.json.JsonConverter
	listeners = null
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	offset.flush.interval.ms = 0
	offset.flush.timeout.ms = 5000
	offset.storage.file.filename = data/offsets.dat
	offset.storage.partitions = null
	offset.storage.replication.factor = null
	offset.storage.topic =
	plugin.path = null
	rest.advertised.host.name = null
	rest.advertised.listener = null
	rest.advertised.port = null
	rest.extension.classes = []
	rest.host.name = null
	rest.port = 8083
	ssl.client.auth = none
	task.shutdown.graceful.timeout.ms = 5000
	topic.tracking.allow.reset = true
	topic.tracking.enable = true
	value.converter = class org.apache.kafka.connect.json.JsonConverter

2020-05-15 11:33:12,763 INFO  [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,763 INFO  [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,765 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
	converter.type = key
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = true

2020-05-15 11:33:12,765 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
	converter.type = value
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = true

2020-05-15 11:33:12,767 INFO  [io.deb.ser.DebeziumServer] (main) Engine executor started
2020-05-15 11:33:12,773 INFO  [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Starting FileOffsetBackingStore with file data/offsets.dat
2020-05-15 11:33:12,835 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Starting PostgresConnectorTask with configuration:
2020-05-15 11:33:12,837 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    connector.class = io.debezium.connector.postgresql.PostgresConnector
2020-05-15 11:33:12,837 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    offset.flush.interval.ms = 0
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.user = postgres
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.dbname = postgres
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    offset.storage.file.filename = data/offsets.dat
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.hostname = localhost
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.password = ********
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    name = kinesis
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    topic.prefix = tutorial
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.port = 5432
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    schema.include.list = inventory
2020-05-15 11:33:12,908 INFO  [io.quarkus] (main) debezium-server 1.2.0-SNAPSHOT (powered by Quarkus 1.4.1.Final) started in 1.198s. Listening on: http://0.0.0.0:8080
2020-05-15 11:33:12,911 INFO  [io.quarkus] (main) Profile prod activated.
2020-05-15 11:33:12,911 INFO  [io.quarkus] (main) Installed features: [cdi, smallrye-health]

Source configuration

The source configuration uses the same configuration properties that are described on the specific connector documentation pages (just with debezium.source prefix), together with few more specific ones, necessary for running outside of Kafka Connect:

Property Default Description

The name of the Java class implementing the source connector.

org.apache.kafka.connect.storage.FileOffsetBackingStore

Class to use for storing and retrieving offsets for non-Kafka deployments. Available options

  • org.apache.kafka.connect.storage.FileOffsetBackingStore for non-Kafka deployments

  • org.apache.kafka.connect.storage.MemoryOffsetBackingStore volatile store for test environments

  • io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore for database using JDBC

  • io.debezium.storage.redis.offset.RedisOffsetBackingStore for Redis deployments

If using a file offset store (default), the file in which connector offsets are stored for non-Kafka deployments.

Defines how frequently the offsets are flushed into the file.

(Optional) If using Redis to store offsets, an address, formatted as host:port, at which the Redis target streams are provided. If not supplied, will attempt to read debezium.sink.redis.address

(Optional) If using Redis to store offsets, a user name used to communicate with Redis. If the redis.address configuration is not supplied, and the redis.address is taken from the Redis sink, will attempt to load the value from debezium.sink.redis.user

(Optional) If using Redis to store offsets, a password (of respective user) used to communicate with Redis. A password must be set if a user is set. If the redis.address configuration is not supplied, and the redis.address is taken from the Redis sink, will attempt to load the value from debezium.sink.redis.password

(Optional) If using Redis to store offsets, whether or not to use SSL to communicate with Redis. If the redis.address configuration is not supplied, and the redis.address is taken from the Redis sink, will attempt to load the value from debezium.sink.redis.ssl.enabled. Default is 'false'

(Optional) If using Redis to store offsets, whether or not to enable hostname verification with Redis. If the redis.address configuration is not supplied, and the redis.address is taken from the Redis sink, will attempt to load the value from debezium.sink.redis.ssl.hostname.verification.enabled. Default is 'false'

(Optional) If using Redis to store offsets with SSL enabled, the path to the trust store file. If set, Redis connections will use this property over other configuration or system properties.

(Optional) If using Redis to store offsets with SSL enabled, the password for the trust store file. If set, Redis connections will use this property over other configuration or system properties.

JKS

(Optional) If using Redis to store offsets with SSL enabled, the type of the trust store file. If set, Redis connections will use this property over other configuration or system properties.

(Optional) If using Redis to store offsets with SSL enabled, the path to the key store file. If set, Redis connections will use this property over other configuration or system properties.

(Optional) If using Redis to store offsets with SSL enabled, the password for the key store file. If set, Redis connections will use this property over other configuration or system properties.

JKS

(Optional) If using Redis to store offsets with SSL enabled, the type of the key store file. If set, Redis connections will use this property over other configuration or system properties.

(Optional) If using Redis to store offsets, define the hash key in redis. If the redis.key configuration is not supplied, and the default value is metadata:debezium:offsets

false

If using Redis to store offsets, enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica. For more information see Redis WAIT command.

1000

If using Redis to store offsets, defines the timeout in milliseconds when waiting for replica. Must have a positive value.

false

If using Redis to store offsets, enables retry on wait for replica failure.

1000

If using Redis to store offsets, defines the delay of retry on wait for replica failure.

io.debezium.storage.kafka.history.KafkaSchemaHistory

Some of the connectors (e.g MySQL, SQL Server, Db2, Oracle) track the database schema evolution over time and stores this data in a database schema history. This is by default based on Kafka. There are also other options available

  • io.debezium.storage.file.history.FileSchemaHistory for non-Kafka deployments

  • io.debezium.relational.history.MemorySchemaHistory volatile store for test environments

  • io.debezium.storage.redis.history.RedisSchemaHistory for Redis deployments

  • io.debezium.storage.rocketmq.history.RocketMqSchemaHistory for RocketMQ deployments

  • io.debezium.storage.azure.blob.history.AzureBlobSchemaHistory for Azure Blob Storage deployments

The name and location of the file to which FileSchemaHistory persists its data.

The Redis host:port to connect to if using RedisSchemaHistory.

The Redis user to use if using RedisSchemaHistory.

The Redis password to use if using RedisSchemaHistory.

Use SSL connection if using RedisSchemaHistory.

Enable hostname verification if using RedisSchemaHistory.

(Optional) If using Redis to store schema history with SSL enabled, the path to the trust store file. If set, Redis connections will use this property over other configuration or system properties.

(Optional) If using Redis to store schema history with SSL enabled, the password for the trust store file. If set, Redis connections will use this property over other configuration or system properties.

JKS

(Optional) If using Redis to store schema history with SSL enabled, the type of the trust store file. If set, Redis connections will use this property over other configuration or system properties.

(Optional) If using Redis to store schema history with SSL enabled, the path to the key store file. If set, Redis connections will use this property over other configuration or system properties.

(Optional) If using Redis to store schema history with SSL enabled, the password for the key store file. If set, Redis connections will use this property over other configuration or system properties.

JKS

(Optional) If using Redis to store schema history with SSL enabled, the type of the key store file. If set, Redis connections will use this property over other configuration or system properties.

The Redis key to use for storage if using RedisSchemaHistory. Default: metadata:debezium:schema_history

The initial delay in case of a connection retry to Redis if using RedisSchemaHistory. Default: 300 (ms)

The maximum delay in case of a connection retry to Redis if using RedisSchemaHistory. Default: 10000 (ms)

The maximum number of attempts to connect to Redis. Default: 10

Connection timeout of Redis client if using RedisSchemaHistory. Default: 2000 (ms)

Socket timeout of Redis client if using RedisSchemaHistory. Default: 2000 (ms)

false

If using Redis to store schema history, enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica. For more information see Redis WAIT command.

1000

If using Redis to store schema history, defines the timeout in milliseconds when waiting for replica. Must have a positive value.

false

If using Redis to store schema history, enables retry on wait for replica failure.

1000

If using Redis to store schema history, defines the delay of retry on wait for replica failure.

The name of the rocketmq topic for the database schema history.

localhost:9876

RocketMQ service discovery NameServer address configuration.

false

RocketMQ access control enable configuration, default is 'false'.

RocketMQ access key. If debezium.source.schema.history.internal.rocketmq.acl.enabled is true, the value cannot be empty.

RocketMQ secret key. If debezium.source.schema.history.internal.rocketmq.acl.enabled is true, the value cannot be empty.

60

The maximum number of attempts to recover database schema history.

1000

The number of milliseconds to wait while polling for persisted data during recovery.

60000

Timeout for sending messages to RocketMQ.

Azure Blob Storage account connection string.

Azure Blob Storage account name. This should be set if and only if debezium.source.schema.history.internal.azure.storage.account.connectionstring is empty, which will then use Azure Active Directory authentication.

Azure Blob Storage account container name.

Azure Blob Storage blob name that persists schema history data.

Format configuration

The message output format can be configured for both key and value separately. By default the output is in JSON format but an arbitrary implementation of Kafka Connect’s Converter can be used.

Property Default Description

json

The name of the output format for key, one of json/jsonbytearray/avro/protobuf/simplestring/binary.

Configuration properties passed to the key converter.

json

The name of the output format for value, one of json/jsonbytearray/avro/protobuf/cloudevents/simplestring/binary.

Configuration properties passed to the value converter.

json

The name of the output format for value, one of json/jsonbytearray.

Configuration properties passed to the header converter.

Transformation configuration

Before the messages are delivered to the sink, they can run through a sequence of transformations. The server supports single message transformations defined by Kafka Connect. The configuration will need to contain the list of transformations, implementation class for each transformation and configuration options for each of the transformations.

Property Default Description [id="debezium-transforms"]

debezium.transforms

The comma separated list of symbolic names of transformations.

debezium.transforms.<name>.type

The name of Java class implementing the transformation with name <name>.

debezium.transforms.<name>.*

Configuration properties passed to the transformation with name <name>.

debezium.transforms.<name>.predicate

The name of the predicate to be applied to the transformation with name <name>.

debezium.transforms.<name>.negate

false

Determines if the result of the predicate to the transformation with name <name> will be negated.

Predicates configuration

A Predicate can be associated with a transformation in order to make the transformation optional. The server supports Filter and Conditional SMTs defined by Kafka Connect. The configuration will need to contain the list of predicates, implementation class for each predicate and configuration options for each of the predicates.

Property Default Description [id="debezium-predicates"]

debezium.predicates

The comma separated list of symbolic names of predicates.

debezium.predicates.<name>.type

The name of Java class implementing the predicate with name <name>.

debezium.predicates.<name>.*

Configuration properties passed to the predicate with name <name>.

Asynchronous Engine Properties

By default, Debezium server uses the asynchronous embedded engine (AsyncEmbeddedEngine) as its processing engine. You can configure the following options for the asynchronous embedded engine:

Property

Default

Description

Threads allocated on demand, based on the workload and the number of available CPU cores.

The number of threads that are available to process change event records. If no value is specified (the default), the engine uses the Java ThreadPoolExecutor to dynamically adjust the number of threads, based on the current workload. Maximum number of threads is number of CPU cores on given machine. If a value is specified, the engine uses the Java fixed thread pool method to create a thread pool with the specified number of threads. To use all available cores on given machine, set the placeholder value, AVAILABLE_CORES.

1000

Maximum time, in milliseconds, that the engine allows for processing pending records after a task shutdown is called.

180,000 (3 min)

Time, in milliseconds, that the engine waits for a task’s lifecycle management operations (starting and stopping) to complete.

Additional configuration

Debezium Server runs on top of the Quarkus framework. All configuration options exposed by Quarkus are available in Debezium Server too. The most frequent used are:

Property Default Description [id="debezium-quarkus-http-port"]

quarkus.http.port

8080

The port on which Debezium exposes Microprofile Health endpoint and other exposed status information. Health can be accessed on http://host:8080/q/health.

quarkus.log.level

INFO

The default log level for every log category.

quarkus.log.console.json

true

Determine whether to enable the JSON console formatting extension, which disables "normal" console formatting.

JSON logging can be disabled by setting quarkus.log.console.json=false in the config/application.properties file, as demonstrated in the config/application.properties.example file.

Enabling message filtering

Debezium Server provides filter SMTs (Single Message Transformations) capability. See Message Filtering for more details. However, for security reasons it’s not enabled by default and has to be explicitly enabled when Debezium Server is started. To enable it, set environment variable ENABLE_DEBEZIUM_SCRIPTING to true. This will add debezium-scripting jar file and JSR 223 implementations (currently Groovy and graalvm.js) jar files into the server class path. These jar files are contained in opt_lib directory of the Debezium Server distribution.

Sink configuration

Sink configuration is specific for each sink type.

The sink is selected by configuration property debezium.sink.type.

Amazon Kinesis

Amazon Kinesis is an implementation of data streaming system with support for stream sharding and other techniques for high scalability. Kinesis exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink.

Property Default Description

Must be set to kinesis.

A region name in which the Kinesis target streams are provided.

endpoint determined by aws sdk

(Optional) An endpoint url at which the Kinesis target streams are provided.

(Optional) A credentials profile name used to communicate with Amazon API through the default credential profiles file. If not present will be used the default credentials provider chain. It will look for credentials on the following order: environment variables, java system properties, web identity token credentials, default credential profiles file, Amazon ECS container credentials and instance profile credentials.

default

Kinesis does not support the notion of messages without key. So this string will be used as message key for messages from tables without primary key.

Injection points

The Kinesis sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface CDI classifier Description

@CustomConsumerBuilder

Custom configured instance of a KinesisClient used to send messages to target streams.

Custom implementation maps the planned destination (topic) name into a physical Kinesis stream name. By default the same name is used.

Google Cloud Pub/Sub

Google Cloud Pub/Sub is an implementation of messaging/eventing system designed for scalable batch and stream processing applications. Pub/Sub exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink.

Property Default Description

Must be set to pubsub.

system-wide default project id

A project name in which the target topics are created.

true

Pub/Sub can optionally use a message key to guarantee the delivery of the messages in the same order as were sent for messages with the same order key. This feature can be disabled.

default

Tables without primary key sends messages with null key. This is not supported by Pub/Sub so a surrogate key must be used.

100

The maximum amount of time to wait to reach element count or request bytes threshold before publishing outstanding messages to Pub/Sub.

100L

Once this many messages are queued, send all of the messages in a single call, even if the delay threshold hasn’t elapsed yet.

10000000L

Once the number of bytes in the batched request reaches this threshold, send all of the messages in a single call, even if neither the delay or message count thresholds have been exceeded yet.

false

When enabled, configures your publisher client with flow control to limit the rate of publish requests.

Long.MAX_VALUE

(Optional) If flow control enabled, the maxmium number of messages before messages are blocked from being published

Long.MAX_VALUE

(Optional) If flow control enabled, the maxmium number of bytes before messages are blocked from being published

60000

The total timeout for a call to publish (including retries) to Pub/Sub.

5

The initial amount of time to wait before retrying the request.

2.0

The previous wait time is multiplied by this multiplier to come up with the next wait time, until the max is reached.

Long.MAX_VALUE

The maximum amount of time to wait before retrying. i.e. after this value is reached, the wait time will not increase further by the multiplier.

10000

Controls the timeout for the initial Remote Procedure Call

2.0

The previous RPC timeout is multiplied by this multipler to come up with the next RPC timeout value, until the max is reached

10000

The max timeout for individual publish requests to Cloud Pub/Sub.

30000

The max wait time for retrieve of publish requests results to Cloud Pub/Sub.

0

Number of threads used by the client library to publish messages. Disabled when set to 0.

-1

Threshold in bytes above which messages are compressed for transport. Disabled when set to -1.

The address of the pubsub emulator. Only to be used in a dev or test environment with the pubsub emulator. Unless this value is set, debezium-server will connect to a cloud pubsub instance running in a gcp project, which is the desired behavior in a production environment.

The Google Cloud region to connect to (e.g., us-central1, asia-northeast1). When specified, Debezium will use the locational endpoint for Pub/Sub in the format {region}-pubsub.googleapis.com:443. This allows connecting to locational endpoints instead of the global endpoint. Note that this parameter is ignored if debezium.sink.pubsub.address is specified.

Injection points

The Pub/Sub sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface CDI classifier Description

@CustomConsumerBuilder

A class that provides custom configured instance of a Publisher used to send messages to a dedicated topic.

Custom implementation maps the planned destination (topic) name into a physical Pub/Sub topic name. By default the same name is used.

Pub/Sub Lite

Google Cloud Pub/Sub Lite is a cost-effective alternative to Google Cloud Pub/Sub. Pub/Sub exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink.

Property Default Description

Must be set to pubsublite

system-wide default project id

A project name or project id in which the target topics are created.

Region where the topics are being created. Example us-east1-b.

true

Pub/Sub Lite can optionally use a message key to guarantee the delivery of the messages in with the same key to the same partition. This feature can be disabled.

default

Tables without primary key sends messages with null key. This is not supported by Pub/Sub Lite so a surrogate key must be used.

30000

The max wait time for retrieve of publish requests results to Cloud Pub/Sub.

Injection points

The Pub/Sub Lite sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface CDI classifier Description

@CustomConsumerBuilder

A class that provides custom configured instance of a Publisher used to send messages to a dedicated topic.

Custom implementation maps the planned destination (topic) name into a physical Pub/Sub Lite topic name. By default the same name is used.

HTTP Client

The HTTP Client will stream changes to any HTTP Server for additional processing with the original design goal to have Debezium act as a Knative Event Source. The HTTP Client sink supports optional JSON Web Token (JWT) authentication.

Property Default Description

Must be set to http

The HTTP Server URL to stream events to. This can also be set by defining the K_SINK environment variable, which is used by the Knative source framework.

60000

The number of seconds to wait for a response from the server before timing out. (default of 60s)

5

The number retries before exception is thrown (default 5 times).

1000

The number of milliseconds to wait before another attempt to send record is made after failure (default of 1s).

X-DEBEZIUM-

Headers will be prefixed with this value (defaults to X-DEBEZIUM-).

true

Header values will be base64 encoded (defaults to true).

Specifies the type of authentication the HTTP client sink uses when connecting to an HTTP server. Supports one of the following options:

jwt

JSON Web Token (JWT) authentication.

standard-webhooks

Standard Webhooks.

If you omit this property, the HTTP client sink does not use authentication headers for the connection

Specifies the username for JWT authentication.

Specifies the password for JWT authentication.

Specifies the base URL (e.g., http://myserver:8000/) for JWT authentication. The paths auth/authenticate and auth/refreshToken are appended for the JWT initial and authentication REST requests.

Requested duration (in minutes) before the authentication token expires.

Requested duration (in minutes) before the refresh token expires.

The webhooks signing secret that Debezium uses to generate HMAC-SHA256 signatures for the webhook request. The secret must be Base64-encoded, with a size from 24 bytes to 64 bytes (192–512 bits). Optionally, you can add the prefix whsec_ to the secret to help distinguish it from other types of keys or tokens. For more information about implementing or validating webhook signatures, see the Standard Webhooks specification.

Apache Pulsar

Apache Pulsar is high-performance, low-latency server for server-to-server messaging. Pulsar exposes a REST APIs and a native endpoint provides a (not-only) Java client that is used to implement the sink.

Property Default Description

Must be set to pulsar.

0

Configures timeout in milliseconds for sending a batch of messages to Pulsar and waiting for the producer to flush and persist all of them. By default it is set to 0 which means no timeout. Make sure that maxPendingMessages and blockIfQueueFull are configured properly on the producer.

The Pulsar module supports pass-through configuration. The client configuration properties are passed to the client with the prefix removed. At least serviceUrl must be provided.

The Pulsar module supports pass-through configuration. The message producer configuration properties are passed to the producer with the prefix removed. The topic is set by Debezium.

DEFAULT

Specifies the batcher builder for the producer. The producer uses the batcher builder to create a batch message container. This setting is applicable only when batching is enabled. Valid options are DEFAULT or KEY_BASED, which is used for KeyShared subscriptions.

default

Tables without primary key sends messages with null key. This is not supported by Pulsar so a surrogate key must be used.

public

The target tenant used to deliver the message.

default

The target namespace used to deliver the message.

Injection points

The Pulsar sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface CDI classifier Description

Custom implementation maps the planned destination (topic) name into a physical Pulsar topic name. By default the same name is used.

Azure Event Hubs

Azure Event Hubs is a big data streaming platform and event ingestion service that can receive and process millions of events per second. Data sent to an event hub can be transformed and stored by using any real-time analytics provider or batching/storage adapters.

Property Default Description

Must be set to eventhubs.

Connection string required to communicate with Event Hubs. The format is: Endpoint=sb://<NAMESPACE>/;SharedAccessKeyName=<ACCESS_KEY_NAME>;SharedAccessKey=<ACCESS_KEY_VALUE>

Name of the Event Hub

(Optional) The identifier of the Event Hub partition that the events will be sent to. Use this if you want all the change events received by Debezium to be sent to a specific partition in Event Hubs. Do not use if you have specified debezium.sink.eventhubs.partitionkey

(Optional) The partition key will be used to hash the events. Use this if you want all the change events received by Debezium to be sent to a specific partition in Event Hubs. Do not use if you have specified debezium.sink.eventhubs.partitionid

Sets the maximum size for the batch of events, in bytes.

No default value

(Optional) Specifies the hash function that Debezium uses to encrypt encrypt Azure Event Hubs message keys.

Specify one of the following values:

  • java

  • md5

  • sha1

  • sha256

Using partitions in EventHubs

By default, when neither of the optional debezium.sink.eventhubs.partitionid or debezium.sink.eventhubs.partitionkey properties are defined, the EventHubs sink will send events round-robin to all available partitions.

You can enforce all messages to be sent to a single, fixed, partition by setting the debezium.sink.eventhubs.partitionid property. Alternatively, you can use the debezium.sink.eventhubs.partitionkey property to specify a fixed partition key that EventHubs will use to route all events to a specific partition.

If you have more specific routing requirements you can use the Partition Routing transformer. Ensure that the number of partitions specified in the transformer’s partition.topic.num setting is equal or less to the number of partitions available in your EventHubs namespace, so that events cannot be routed to non-existing partition IDs. As an example, to route all events to 5 partitions based on their source schema name, you can set the following in your application.properties:

# Uses a hash of `source.db` to calculate which partition to send the event to. Ensures all events from the same source schema are sent to the same partition.
debezium.transforms=PartitionRouter
debezium.transforms.PartitionRouter.type=io.debezium.transforms.partitions.PartitionRouting
debezium.transforms.PartitionRouter.partition.payload.fields=source.db
debezium.transforms.PartitionRouter.partition.topic.num=5
Injection points

The default sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface CDI classifier Description

@CustomConsumerBuilder

Custom configured instance of a EventHubProducerClient used to send messages.

Redis (Stream)

Redis is an open source (BSD licensed) in-memory data structure store, used as a database, cache and message broker. The Stream is a data type which models a log data structure in a more abstract way. It implements powerful operations to overcome the limitations of a log file.

Property Default Description

Must be set to redis.

An address, formatted as host:port, at which the Redis target streams are provided.

0

A number in the range 0..15 used for selecting the database to work with. Default is database 0. This feature is only available for standalone Redis connections; Redis clusters use only database 0.

(Optional) A user name used to communicate with Redis.

(Optional) A password (of respective user) used to communicate with Redis. A password must be set if a user is set.

false

(Optional) A Boolean value that specifies whether connections to Redis require SSL.

false

(Optional) A Boolean value that specifies whether connections to Redis should verify the hostname of the server.

(Optional) If using Redis sink with SSL enabled, the path to the trust store file. If set, Redis connections will use this property over other configuration or system properties.

(Optional) If using Redis sink with SSL enabled, the password for the trust store file. If set, Redis connections will use this property over other configuration or system properties.

JKS

(Optional) If using Redis sink with SSL enabled, the type of the trust store file. If set, Redis connections will use this property over other configuration or system properties.

(Optional) If using Redis sink with SSL enabled, the path to the key store file. If set, Redis connections will use this property over other configuration or system properties.

(Optional) If using Redis sink with SSL enabled, the password for the key store file. If set, Redis connections will use this property over other configuration or system properties.

JKS

(Optional) If using Redis sink with SSL enabled, the type of the key store file. If set, Redis connections will use this property over other configuration or system properties.

default

Redis does not support the notion of data without key. So this string will be used as key for records without primary key.

default

Redis does not support the notion of null payloads, as is the case with tombstone events. So this string will be used as value for records without a payload.

500

Number of change records to insert in a single batch write (Pipelined transaction).

300

Initial retry delay when encountering Redis connection or OOM issues. This value will be doubled upon every retry but won’t exceed debezium.sink.redis.retry.max.delay.ms

10000

Max delay when encountering Redis connection or OOM issues.

2000

Connection timeout for Redis client.

2000

Socket timeout for Redis client.

false

Enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica. For more information see Redis WAIT command.

1000

Timeout in milliseconds when waiting for replica. Must have a positive value.

false

Enables retry on wait for replica failure.

1000

Delay of retry on wait for replica failure.

compact

The format of the message sent to the Redis stream. Possible values are extended(newer format) and compact(the until now, old format). Read more about the message format below.

85

The sink will stop consuming records if the used_memory percentage (out of Redis configured maxmemory) is higher or equal to this threshold. If the configured value is 0 then this threshold is disabled.

0

If Redis maxmemory is not available or 0, the debezium.sink.redis.memory.threshold.percentage will apply to this value (if this value is positive). By default it is 0 (disabled).

true

Determines whether heartbeat messages from Debezium connectors should be skipped (not stored in Redis). When set to true (default), heartbeat messages are marked as processed but not stored in Redis streams. When set to false, heartbeat messages are stored in Redis streams alongside regular CDC events.

Message Format

We have seen above the debezium.sink.redis.message.format property which configures the message format in two ways which look like this in Redis:

  • the extended format, using two pairs {1), 2)}={"key", "message key"} and {3), 4)}={"value", "message value"}:

1) 1) "1639304527499-0"
   2) 1) "key"
      2) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Key\"}, \"payload\": {\"empno\": 11}}"
      3) "value"
      4) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"before\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"after\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"version\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"connector\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"name\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"ts_ms\"}, {\"type\": \"string\", \"optional\": true, \"name\": \"io.debezium.data.Enum\", \"version\": 1, \"parameters\": {\"allowed\": \"true,last,false\"}, \"default\": \"false\", \"field\": \"snapshot\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"db\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"sequence\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"schema\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"table\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"change_lsn\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"commit_lsn\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"event_serial_no\"}], \"optional\": false, \"name\": \"io.debezium.connector.sqlserver.Source\", \"field\": \"source\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"op\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"ts_ms\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"id\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"total_order\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"data_collection_order\"}], \"optional\": true, \"field\": \"transaction\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Envelope\"}, \"payload\": {\"before\": {\"empno\": 11, \"fname\": \"Yossi\", \"lname\": \"Mague\", \"job\": \"PFE\", \"mgr\": 1, \"hiredate\": 1562630400000, \"sal\": \"dzWUAA==\", \"comm\": \"AYag\", \"dept\": 3}, \"after\": null, \"source\": {\"version\": \"1.6.0.Final\", \"connector\": \"sqlserver\", \"name\": \"redislabs\", \"ts_ms\": 1637859764960, \"snapshot\": \"false\", \"db\": \"RedisConnect\", \"sequence\": null, \"schema\": \"dbo\", \"table\": \"emp\", \"change_lsn\": \"0000003a:00002f50:0002\", \"commit_lsn\": \"0000003a:00002f50:0005\", \"event_serial_no\": 1}, \"op\": \"d\", \"ts_ms\": 1637859769370, \"transaction\": null}}"
  • and the compact format, using only one pair {1), 2)}={"message key", "message value"}:

1) 1) "1639304527499-0"
   2) 1) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Key\"}, \"payload\": {\"empno\": 11}}"
      2) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"before\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"after\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"version\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"connector\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"name\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"ts_ms\"}, {\"type\": \"string\", \"optional\": true, \"name\": \"io.debezium.data.Enum\", \"version\": 1, \"parameters\": {\"allowed\": \"true,last,false\"}, \"default\": \"false\", \"field\": \"snapshot\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"db\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"sequence\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"schema\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"table\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"change_lsn\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"commit_lsn\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"event_serial_no\"}], \"optional\": false, \"name\": \"io.debezium.connector.sqlserver.Source\", \"field\": \"source\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"op\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"ts_ms\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"id\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"total_order\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"data_collection_order\"}], \"optional\": true, \"field\": \"transaction\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Envelope\"}, \"payload\": {\"before\": {\"empno\": 11, \"fname\": \"Yossi\", \"lname\": \"Mague\", \"job\": \"PFE\", \"mgr\": 1, \"hiredate\": 1562630400000, \"sal\": \"dzWUAA==\", \"comm\": \"AYag\", \"dept\": 3}, \"after\": null, \"source\": {\"version\": \"1.6.0.Final\", \"connector\": \"sqlserver\", \"name\": \"redislabs\", \"ts_ms\": 1637859764960, \"snapshot\": \"false\", \"db\": \"RedisConnect\", \"sequence\": null, \"schema\": \"dbo\", \"table\": \"emp\", \"change_lsn\": \"0000003a:00002f50:0002\", \"commit_lsn\": \"0000003a:00002f50:0005\", \"event_serial_no\": 1}, \"op\": \"d\", \"ts_ms\": 1637859769370, \"transaction\": null}}"

You can read more about Redis Streams here.

Injection points

The Redis sink behavior can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface CDI classifier Description

Custom implementation maps the planned destination (topic) name into a physical Redis stream name. By default the same name is used.

NATS Streaming

NATS Streaming is a data streaming system powered by NATS, and written in the Go programming language.

Property Default Description

Must be set to nats-streaming.

URL (or comma separated list of URLs) to a node or nodes in the cluster formatted as nats://host:port.

NATS Streaming Cluster ID.

NATS Streaming Client ID.

Injection points

The NATS Streaming sink behavior can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface CDI classifier Description

@CustomConsumerBuilder

Custom configured instance of a StreamingConnection used to publish messages to target subjects.

Custom implementation maps the planned destination (topic) name into a physical NATS Streaming subject name. By default the same name is used.

NATS JetStream

NATS has a built-in distributed persistence system called JetStream which enables new functionalities and higher qualities of service on top of the base 'Core NATS' functionalities and qualities of service.

Property Default Description

Must be set to nats-jetstream.

URL (or comma separated list of URLs) to a node or nodes in the cluster formatted as nats://host:port.

If true, a basic stream will be created.

*.*.*

A comma separated list of subjects, messaging channel names. Can contain wildcards like test.inventory.*
Important: To capture both schema change events and data change events, you must specify both the topic prefix and a wildcard pattern. For example, if your debezium.source.topic.prefix is myapp, configure subjects as myapp,myapp.> or myapp,myapp...
- Schema change events (DDL) are published to the exact topic prefix (e.g., myapp) and this is rejected by any subject that has a subtopic wildcard
- Data change events are published to table-specific subjects (e.g., myapp.database.table)
- Use myapp.> to match any number of subject levels, or myapp.. to match exactly two levels after the prefix

memory

Controls how the messages are saved in the stream. Can be memory or file.

No default value

Specifies the identity of the NATS server client. Add this property to the configuration to enable JSON Web Token (JWT) authentication with NATS. To use JWT authentication with NATS, you must specify the NKey seed. Do not enable JWT authentication, if password authentication is enabled.

No default value

When JWT authentication is enabled for NATS, use this property so specify the NKey seed that represents the Debezium user. Debezium uses the specified NKey seed to derive a private key. It then uses the private key to cryptographically sign the nonce challenge that the NATS server issues during the authentication process. Debezium returns the signed nonce to the server, along with the public key for the specified debezium.sink.nats-jetstream.auth.jwt client.

No default value

Specifies the username of the authorized NAT user.
When this property is present in the configuration, password authentication with NATS is enabled.
To use password authentication with NATS, specify a debezium.sink.nats-jetstream.auth.password. Do not enable password authentication if JWT authentication is enabled.

No default value

Specifies the password to use when password authentication is enabled.

true

(Optional) A Boolean value that specifies whether Debezium can stream asynchronously to a NATS JetStream server.

5000

(Optional) Specifies the maximum time, in milliseconds, that Debezium waits for acknowledgment from the NATS server after it sends a batch of messages for asynchronous processing. During asynchronous processing, each message is published with a timeout specified by asyncTimeoutMs.

If you need a more configurable stream, it can be created with nats cli. More about streams at: https://docs.nats.io/nats-concepts/jetstream/streams

Injection points

The NATS JetStream sink behavior can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface CDI classifier Description

@CustomConsumerBuilder

Custom configured instance of a JetStream used to publish messages to target subjects.

Custom implementation maps the planned destination (topic) name into a physical NATS JetStream subject name. By default the same name is used.

Apache Kafka

Apache Kafka is a popular open-source platform for distributed event streaming. Debezium Server supports publishing captured change events to a configured Kafka message broker.

Property Default Description

Must be set to kafka.

The Kafka sink adapter supports pass-through configuration. This means that all Kafka producer configuration properties are passed to the producer with the prefix removed. At least bootstrap.servers, key.serializer and value.serializer properties must be provided. The topic is set by Debezium.

30000

The maximum time, in milliseconds, that the server waits for a request to complete and return metadata for a record. The specified timeout also governs the interval that the server waits for Kafka to respond to a request. Set the value to 0 to disable the timeout.

Injection points

The Kafka sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface CDI classifier Description

Custom implementation maps the original destination (topic) name into another Kafka topic. By default, the same name is used.

Pravega

Pravega is a cloud-native storage system for event streams and data streams. This sink offers two modes: non-transactional and transactional. The non-transactional mode individually writes each event in a Debezium batch to Pravega. The transactional mode writes the Debezium batch to a Pravega transaction that commits when the batch is completed.

The Pravega sink expects destination scope and streams to already be created.

Property Default Description

Must be set to pravega.

tcp://localhost:9090

The connection string to a Controller in the Pravega cluster.

The name of the scope in which to find the destination streams.

false

Set to true to have the sink use Pravega transactions for each Debezium batch.

Injection points

Pravega sink behavior can be modified by custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface CDI classifier Description

Custom implementation maps the planned destination (stream) name into a physical Pravega stream name. By default the same name is used.

Infinispan

Infinispan is open-source in-memory data grid that offers rich set of caches types as well as cache stores. Due to very fast data access, Infinispan can be used, besides others, as a data source for various data processing and analytical tools.

The Infinispan sink expects that the destination cache is already defined and created within the Infinispan cluster.

Property Default Description

Must be set to infinispan.

The host name of one of the servers of the Infinispan cluster (can be also a comma-separated list of servers).

11222

The port of the Infinispan server.

The name of the (exiting) cache where the records will be stored.

(Optional) The user name used for connecting to Infinispan cluster.

(Optional) The password used for connecting to Infinispan cluster.

Injection points

The Infinispan sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface CDI classifier Description

@CustomConsumerBuilder

Custom instance of Hot Rod cache which will be used for connecting and sending events to the Infinspan cluster.

Apache RocketMQ

Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability. Debezium server supports publishing captured change events to a configured RocketMQ.

Property Default Description

Must be set to rocketmq.

Name server address of Apache RocketMQ .

Producer group of Apache RocketMQ.

4M, Suggest less than 4 MB.

(Optional) Maximum number of bytes of sent message body.

3000ms

(Optional) The send message timeout duration is the waiting time for local synchronous invocation of clients. Set a proper value based on the actual application to avoid long thread blocking time.

false

(Optional) The configuration is used to enable access authorization.

(Optional) The access key used for connecting to Apache RocketMQ cluster .

(Optional) The access secret used for connecting to Apache RocketMQ cluster .

Injection points

The RocketMQ sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface CDI classifier Description

@CustomConsumerBuilder

Custom configured instance of a RocketMQ used to publish messages to target topic.

Custom implementation maps the planned destination (stream) name into a RocketMQ topic name. By default the same name is used.

RabbitMQ Stream

RabbitMQ is an open source message broker, supporting multiple messaging protocols and can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements. RabbitMQ supports messages queues and streams. Debezium Server supports publishing captured change events to a configured RabbitMQ Stream.

Property Default Description

Must be set to rabbitmq.

localhost

Host of RabbitMQ server.

5672

Port of RabbitMQ server.

The RabbitMQ module supports pass-through configuration. The connection configuration properties are passed to the RabbitMQ client with the prefix removed.

30000

Defines the maximum time in milliseconds to wait a confirm from the broker after publishing a message.

topic name

(Optional) Exchange name to use when publishing messages.

empty string

(Optional) Static routing key to use when publishing messages.

false

(Optional) If true the non-existing routing key is automatically created.

true

(Optional) If true the target queue content will survive a RabbitMQ server restart.

false

(Optional) Deprecated, see debezium.sink.rabbitmq.routingKey.source.

2

(Optional) The way how the message is delivered to and stored on a RabbitMQ server

  • 1 - Non-persistent

  • 2 - Persistent

default

RabbitMQ does not support the notion of null payloads, as is the case with tombstone events. So this string will be used as value for records without a payload.

static

(Optional) The way the routing key for the event is going to be obtained.

  • static (default): the routing key will be obtained from debezium.sink.rabbitmq.routingKey.

  • topic: the routing key is the same as the exchange name.

  • key: the routing key will be obtained from the record key.

Injection points

RabbitMQ sink behavior can be modified by custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface CDI classifier Description

Custom implementation maps the planned destination (stream) name into a RabbitMQ exchange name and (if enabled) into the routing key name. By default the same name is used.

RabbitMQ Native Stream

Since RabbitMQ 3.9, Streams were introduced to RabbitMQ, utilizing a new blazingly-fast protocol that can be used alongside AMQP 0.9.1. Streams are great for large fan-outs, replay & time travel, and large logs, all with very high throughput (million messages per second).

Debezium Server is enhanced to support publishing captured change events to native RabbitMQ Streams leveraging RabbitMQ Stream Java Client.

Property Default Description

Must be set to rabbitmqstream.

localhost

Host of RabbitMQ server.

5552

Port of RabbitMQ Stream Protocol.

The RabbitMQ module supports pass-through configuration. The connection configuration properties are passed to the RabbitMQ client with the prefix removed.

30000

Defines the maximum time in milliseconds to wait a confirm from the broker after publishing a message.

default

RabbitMQ does not support the notion of null payloads, as is the case with tombstone events. So this string will be used as value for records without a payload.

Milvus

Milvus is an open-source vector database designed for similarity search and the retrieval of high-dimensional data, such as embeddings from machine learning models (for example. text, images, and audio). You can use Milvus to process vector datatypes that are captured from a source database, or use it with a transformation to calculate vectors from message fields and then use them as embeddings.

Milvus sink ingests incoming messages and upserts the after part of each message into a collection. The collection cannot contain dots, so the sink replaces all dots with underscore characters. When a delete message arrives, the matching record is removed from the collection.

Property Default Description

Specifies the type of sink. Must be set to milvus.

(Optional) The URL to access the Milvus database instance.

default

(Optional) The name of the database that contains the target collections.

Injection points

You can modify the behavior of the Milvus sink connector by applying custom logic that provides alternative implementations for specific functions. If the alternative implementations are not available, the connector uses the default implementations.

Interface CDI classifier Description

@CustomConsumerBuilder

An instance of a custom MilvusClientV2 client that is configured to access target collections.

Custom implementation that maps the name of the planned destination topic to a Milvus collection. By default, dots in a name are replaced with underscores.

Qdrant Sink

Qdrant is an open-source vector database optimized for vector similarity search and extended with powerful filtering capabilities. It’s designed for high-load applications, and it enables you to efficiently store, manage, and search embedding vectors. You can use Qdrant to process vector datatypes that are captured directly from a source database, or you can use a transformation to calculate embeddings from message fields, and then send those embeddings to the database for processing.

Qdrant sink ingests incoming messages and upserts the after part of each message into a collection. When a delete message arrives, the matching record is removed from the collection.

The sink behaves according to the following rules:

  • Each Debezium collection or table is mapped to a Qdrant collection.

  • Primary keys are required and are used as Qdrant point IDs (only INT64 and UUID are supported).

  • FloatVector and DoubleVector data can serve as sources for Qdrant vectors.

  • Non-primary key and non-vector fields are mapped to Qdrant payloads.

Property Default Description

No default value.

Specifies the type of sink. You must explicitly set the value to qdrant.

localhost

(Optional) The hostname to access the Qdrant database instance.

6333

(Optional) The port to access the Qdrant database instance.

(Optional) The API key required to authenticate with the Qdrant database instance.

No default value.

(Optional) Comma-separated list of collection-name:field-name pairs that explicitly define the vector fields to use for each collection.
This field is mandatory for source tables and collections that contain multiple vector fields.

No default value.

(Optional) Comma-separated list that specifies the subset of field names in a collection that represent the Qdrant collection payload.

Injection points

To modify connector behavior, you can apply custom logic to specify alternative implementations for certain functions. If you specify implementations that are not available, the connector uses the default implementations.

Interface CDI classifier Description

@CustomConsumerBuilder

An instance of a custom QdrantClient client that is configured to access target collections.

No default value.

Custom implementation that maps the name of the planned destination topic to a Qdrant collection.

InstructLab

InstructLab is a community-driven project for augmenting large language models (LLMs) for use in generative artificial intelligence applications. Working with InstructLab, users who identify gaps in the capabilities of a base model can collaboratively develop a taxonomy to augment the model, with each contributor providing specific expertise and skills.

Debezium Server enables you to automate the process of adding skills and knowledge into a taxonomy by configuring a data sink based on InstructLab question and answer (qna.yml) files. The sink configuration defines a series of mappings that are used to derive question, answer, and context values from an event stream. These mappings can source values directly from fields within the event payload, headers, or statically configured constants. Periodically, users can then use InstructLab to train the model to take advantage of the new skills and knowledge added to the taxonomies.

Property Default Description

Must be set to instructlab.

The absolute path to the root directory where you store InstructLab taxonomy skills and knowledge. This value is used in conjunction with taxonomy domain properties to construct the full path to the qna.yml file.

A comma-separated list of the symbolic names of taxonomy mappings.

.*

A regular expression that is used to match topics to determine whether to apply the <name> taxonomy mapping.

Specifies a Mapping definition to use as the seed example’s question attribute in the qna.yml file. This is required.

Specifies a Mapping definition to use as the seed example’s answer attribute in the qna.yml file. This is required.

Specifies a Mapping definition to use as the seed example’s context attribute in the qna.yml file. This is optional.

Specifies the taxonomy domain, a series of directories separated by / to the qna.yml, excluding the taxonomy base path. For example, a value of a/b and a base path of /taxonomy represents /taxonomy/a/b/qna.yml.

Mapping definition

In the InstructLab sink configuration, you can set properties to specify how Debezium Server maps fields in event messages to question, answer, and contextattributes in an InstructLab qna.yml file.

For each attribute type that you want to populate in the qna.yml file, you specify a mapping prefix that determines the message field from which Debezium extracts the value. You can specify the following prefix values:

Value

If you prefix a mapping definition with the string value:, Debezium extracts the value of the specified field from the incoming event payload. For example, to populate an InstructLab question attribute with the value of the abc field in the event payload, set the debezium.sink.instructlab.taxonomy.<name>.question property to value:abc. When Debezium processes the message it takes the value of the payload field abc and adds it as a question attribute in the qna.yml file.

For events that have a Debezium-structured payload, Debezium extracts the specified field from the after section of the payload. If the event is flattened, the field is sourced directly from the event’s value.

Header

If you prefix a mapping definition with the string header:, Debezium extracts the value of the specified header field of the incoming event message. For example, if you specify the mapping header:h1, when Debezium detects a header with the name h1 in a source message, it extracts the value of the h1 field.

Constant

If the mapping definition does not include the header: or value: prefixes, when Debezium detects instances of the specified value in incoming messages, it treats them as constants, and uses them as-is. Use this option when you want to map the value of a specific static constant to an attribute in the qna.yml file.

Extensions

Debezium Server uses the Quarkus framework and relies on dependency injection to enable developer to extend its behaviour. Note that only the JVM mode of Quarkus is supported, but not native execution via GraalVM. The server can be extended in two ways by providing a custom logic:

  • implementation of a new sink

  • customization of an existing sink - i.e. non-standard configuration

Implementation of a new sink

The new sink can be implemented as a CDI bean implementing interface DebeziumEngine.ChangeConsumer and with annotation @Named and unique name and scope @Dependent. The name of the bean is used as the debezium.sink.type option.

The sink needs to read the configuration using Microprofile Config API. The execution path must pass the messages into the target system and regularly commit the passed/processed messages.

See the Kinesis sink implementation for further details.

Customization of an existing sink

Some of the sinks exposes dependency injections points that enable users to provide its own bean that would modify the behaviour of the sink. Typical examples are fine tuning of the target client setup, the destination naming etc.

See an example of a custom topic naming policy implementation for further details.

Cassandra connector

Running Debezium Server with Cassandra connector

Running with java 11+ requires setting the following java options at startup trough the JDK_JAVA_OPTIONS environment variable or equivalent:

JDK_JAVA_OPTIONS="--add-exports java.base/jdk.internal.misc=ALL-UNNAMED --add-exports java.base/jdk.internal.ref=ALL-UNNAMED --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-exports java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED --add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports java.rmi/sun.rmi.server=ALL-UNNAMED --add-exports java.sql/java.sql=ALL-UNNAMED  --add-opens java.base/java.lang.module=ALL-UNNAMED --add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens java.base/jdk.internal.ref=ALL-UNNAMED --add-opens java.base/jdk.internal.reflect=ALL-UNNAMED --add-opens java.base/jdk.internal.math=ALL-UNNAMED --add-opens java.base/jdk.internal.module=ALL-UNNAMED --add-opens java.base/jdk.internal.util.jar=ALL-UNNAMED --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens java.base/sun.nio.ch=ALL-UNNAMED"

Sample of basic application.properties for running Cassandra connector with Redis sink

# Sink
debezium.sink.type=redis
debezium.sink.redis.address=localhost:6379

# Connector
debezium.source.connector.class=io.debezium.connector.cassandra.Cassandra4Connector
## node.id must be unique per each connector running on each Cassandra node
debezium.source.cassandra.node.id=sample_node_01
debezium.source.cassandra.hosts=127.0.0.1
debezium.source.cassandra.port=9042
debezium.source.cassandra.config=/opt/cassandra/conf/cassandra.yaml
debezium.source.commit.log.relocation.dir=cassandra/relocdir
debezium.source.offset.storage=io.debezium.server.redis.RedisOffsetBackingStore
debezium.source.topic.prefix=sample_prefix
## internal Cassandra http port
debezium.source.http.port=8040

Transformation for Operation Code

By default, Cassandra connector has it’s own Operation Codes which are not entirely compatible with Debezium Operation Codes. If needed, a specific transform can be defined in Debezium Server’s application.properties to enable the conversion:

debezium.transforms=EnvelopeTransformation
debezium.transforms.EnvelopeTransformation.type=io.debezium.connector.cassandra.transforms.EnvelopeTransformation

This will convert Operation Codes as follows:

INSERT "i"          -> CREATE "c"
UPDATE "u"          -> UPDATE "u"
DELETE "d"          -> DELETE "d"
RANGE_TOMBSTONE "r" -> TRUNCATE "t"