Debezium Server

This feature is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive.

Please let us know if you encounter any problems while using this feature. Also please reach out if you have requirements for specific sinks to be supported by Debezium Server or even would be interested in contributing the required implementation.

Debezium provides a ready-to-use application that streams change events from a source database to messaging infrastructure like Amazon Kinesis, Google Cloud Pub/Sub or Apache Pulsar. For streaming change events to Apache Kafka, it is recommended to deploy the Debezium connectors via Kafka Connect.

Installation

To install the server download and unpack the server distribution archive:

A directory named debezium-server will be created with these contents:

debezium-server/
|-- CHANGELOG.md
|-- conf
|-- CONTRIBUTE.md
|-- COPYRIGHT.txt
|-- debezium-server-1.2.0.Final-runner.jar
|-- lib
|-- LICENSE-3rd-PARTIES.txt
|-- LICENSE.txt
|-- README.md
`-- run.sh

The server is started using run.sh script, dependencies are stored in the lib directory, and the directory conf contains configuration files.

Configuration

Debezium Server uses MicroProfile Configuration for configuration. This means that the application can be configured from disparate sources like configuration files, environment variables, system properties etc.

The main configuration file is conf/application.properties. There are multiple sections configured:

  • debezium.source is for source connector configuration; each instance of Debezium Server runs exactly one connector

  • debezium.sink is for the sink system configuration

  • debezium.format is for the output serialization format configuration

  • debezium.transforms is for the configuration of message transformations

An example configuration file can look like so:

debezium.sink.type=kinesis
debezium.sink.kinesis.region=eu-central-1
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.schema.whitelist=inventory

When the server is started it generates a seqeunce of log messages like this:

__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-05-15 11:33:12,189 INFO  [io.deb.ser.kin.KinesisChangeConsumer] (main) Using 'io.debezium.server.kinesis.KinesisChangeConsumer$$Lambda$119/0x0000000840130c40@f58853c' stream name mapper
2020-05-15 11:33:12,628 INFO  [io.deb.ser.kin.KinesisChangeConsumer] (main) Using default KinesisClient 'software.amazon.awssdk.services.kinesis.DefaultKinesisClient@d1f74b8'
2020-05-15 11:33:12,628 INFO  [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.kinesis.KinesisChangeConsumer' instantiated
2020-05-15 11:33:12,754 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
	converter.type = key
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = true

2020-05-15 11:33:12,757 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
	converter.type = value
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = false

2020-05-15 11:33:12,763 INFO  [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values:
	access.control.allow.methods =
	access.control.allow.origin =
	admin.listeners = null
	bootstrap.servers = [localhost:9092]
	client.dns.lookup = default
	config.providers = []
	connector.client.config.override.policy = None
	header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
	internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
	internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
	key.converter = class org.apache.kafka.connect.json.JsonConverter
	listeners = null
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	offset.flush.interval.ms = 0
	offset.flush.timeout.ms = 5000
	offset.storage.file.filename = data/offsets.dat
	offset.storage.partitions = null
	offset.storage.replication.factor = null
	offset.storage.topic =
	plugin.path = null
	rest.advertised.host.name = null
	rest.advertised.listener = null
	rest.advertised.port = null
	rest.extension.classes = []
	rest.host.name = null
	rest.port = 8083
	ssl.client.auth = none
	task.shutdown.graceful.timeout.ms = 5000
	topic.tracking.allow.reset = true
	topic.tracking.enable = true
	value.converter = class org.apache.kafka.connect.json.JsonConverter

2020-05-15 11:33:12,763 INFO  [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,763 INFO  [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,765 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
	converter.type = key
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = true

2020-05-15 11:33:12,765 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
	converter.type = value
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = true

2020-05-15 11:33:12,767 INFO  [io.deb.ser.DebeziumServer] (main) Engine executor started
2020-05-15 11:33:12,773 INFO  [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Starting FileOffsetBackingStore with file data/offsets.dat
2020-05-15 11:33:12,835 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Starting PostgresConnectorTask with configuration:
2020-05-15 11:33:12,837 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    connector.class = io.debezium.connector.postgresql.PostgresConnector
2020-05-15 11:33:12,837 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    offset.flush.interval.ms = 0
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.user = postgres
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.dbname = postgres
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    offset.storage.file.filename = data/offsets.dat
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.hostname = localhost
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.password = ********
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    name = kinesis
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.server.name = tutorial
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.port = 5432
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    schema.whitelist = inventory
2020-05-15 11:33:12,908 INFO  [io.quarkus] (main) debezium-server 1.2.0-SNAPSHOT (powered by Quarkus 1.4.1.Final) started in 1.198s. Listening on: http://0.0.0.0:8080
2020-05-15 11:33:12,911 INFO  [io.quarkus] (main) Profile prod activated.
2020-05-15 11:33:12,911 INFO  [io.quarkus] (main) Installed features: [cdi, smallrye-health]

Source configuration

The source configuration uses the same configuration properties that are described on the specific connector documentation pages (just with debezium.source prefix), together with few more specific ones, necessary for running outside of Kafka Connect:

Property

Default

Description

The name of the Java class implementing the source connector.

The file in which connector offsets are stored for non-Kafka deployments.

Defines how frequently the offsets are flushed into the file.

io.debezium.relational.history.KafkaDatabaseHistory

Some of the connectors (e.g MySQL, SQL Server, Db2, Oracle) monitors the database schema evolution over the time and stores the data in database history. This is by default based on Kafka. There are also other options available

  • io.debezium.relational.history.FileDatabaseHistory for non-Kafka deployments

  • io.debezium.relational.history.MemoryDatabaseHistory volatile store for test environments

The name and location of the file to which FileDatabaseHistory persists its data.

Format configuration

The message output format can be configured for both key and value separately. By default the output is in JSON format but an arbitrary implementation of Kafka Connect’s Converter can be used.

Property

Default

Description

json

The name of the output format for key, one of json/avro.

Configuration properties passed to the key converter.

json

The name of the output format for value, one of json/avro.

Configuration properties passed to the value converter.

Transformation configuration

Before the messages are delivered to the sink, they can run through a sequence of transformations. The server supports single message transformations defined by Kafka Connect. The configuration will need to contain the list of transformations, implementation class for each transformation and configuration options for each of the transformations.

Property

Default

Description

debezium.transformations

The comma separated list of symbolic names of transformations.

debezium.transformations.<name>.class

The name of Java class implementing the transformation with name <name>.

debezium.transformations.<name>.*

Configuration properties passed to the transformation with name <name>.

Addtional configuration

Debezium Server runs on top Quarkus framework. All configuration options exposed by Quarkus are available in Debezium Server too. The most frequent used are:

Property

Default

Description

quarkus.http.port

8080

The port on which Debezim exposes Microprofile Health endpoint and other exposed status information.

Sink configuration

Sink configuration is specific for each sink type. Currently the only supported sinks are Amazon Kinesis and Google Cloud Pub/Sub.

The sink is selected by configuration property debezium.sink.type.

Amazon Kinesis

Amazon Kinesis is an implementation of data streaming system with support for stream sharding and other techniques for high scalability. Kinesis exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink.

Property

Default

Description

Must be set to kinesis.

A region name in which the Kinesis target streams are provided.

default

A credentials profile name used to communicate with Amazon API.

default

Kinesis does not support the notion of messages without key. So this string will be used as message key for messages from tables without primary key.

Injection points

The Kinesis sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface

CDI classifier

Description

@CustomConsumerBuilder

Custom configured instance of a KinesisClient used to send messages to target streams.

Custom implementation maps the planned destination (topic) name into a physical Kinesis stream name. By default the same name is used.

Google Cloud Pub/Sub

Google Cloud Pub/Sub is an implementation of messaging/eventing system designed for scalable batch and stream processing applications. Pub/Sub exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink.

Property

Default

Description

Must be set to pubsub.

system-wide default project id

A project name in which the target topics are created.

true

Pub/Sub can optionally use a message key to guarantee the delivery of the messages in the same order as were sent for messages with the same order key. This feature can be disabled.

default

Tables without primary key sends messages with null key. This is not supported by Pub/Sub so a surrogate key must be used.

Injection points

The Pub/Sub sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

Interface

CDI classifier

Description

@CustomConsumerBuilder

A class that provides custom configured instance of a Publisher used to send messages to a dedicated topic.

Custom implementation maps the planned destination (topic) name into a physical Pub/Sub topic name. By default the same name is used.

Apache Pulsar

Apache Pulsar is high-performance, low-latency server for server-to-server messaging. Pulsar exposes a REST APIs and a native endopint provides a (not-only) Java client that is used to implement the sink.

Property Default Description

Must be set to pulsar.

The Pulsar module supports pass-through configuration. The client configuration properties are passed to the client with the prefix removed. At least serviceUrl must be provided.

The Pulsar module supports pass-through configuration. The message producer configuration properties are passed to the producer with the prefix removed. The topic is set by Debezium.

default

Tables without primary key sends messages with null key. This is not supported by Pulsar so a surrogate key must be used.

Extensions

Debezium Server uses the Quarkus framework and relies on dependency injection to enable developer to extend its behaviour. Note that only the JVM mode of Quarkus is supported, but not native execution via GraalVM. The server can be extended in two ways by providing a custom logic:

  • implementation of a new sink

  • customization of an existing sink - i.e. non-standard configuration

Implementation of a new sink

The new sink can be implemented as a CDI bean implementing interface DebeziumEngine.ChangeConsumer and with annotation @Named and unique name and scope @Dependent. The name of the bean is used as the debezium.sink.type option.

The sink needs to read the configuration using Microprofile Config API. The execution path must pass the messages into the target system and regularly commit the passed/processed messages.

See the Kinesis sink implementation for further details.

Customization of an existing sink

Some of the sinks exposes dependency injections points that enable users to provide its own bean that would modify the behaviour of the sink. Typical examples are fine tuning of the target client setup, the destination naming etc.

See an example of a custom topic naming policy implementation for further details.