When you are working with Kafka Connect Distributed then you might have realized that once you start Kafka Connect there are already some internal Kafka Connect related topics created for you:
$ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --list
connect_configs
connect_offsets
connect_statuses
This is done automatically for you by Kafka Connect with a sane, customized default topic configuration that fits the needs of these internal topics.
When you start a Debezium connector the topics for the captured events are created by the Kafka broker based on a default, maybe customized, configuration in the broker if auto.create.topics.enable = true
is enabled in the broker config:
auto.create.topics.enable = true
default.replication.factor = 1
num.partitions = 1
compression.type = producer
log.cleanup.policy = delete
log.retention.ms = 604800000 ## 7 days
But often, when you use Debezium and Kafka in a production environment you might choose to disable Kafka’s topic auto creation capability with auto.create.topics.enable = false
, or you want the connector topics to be configured differently from the default. In this case you have to create topics for Debezium’s captured data sources upfront.
But there’s good news! Beginning with Kafka Connect version 2.6.0, this can be automated since KIP-158 is implemented to enable customizable topic creation with Kafka Connect.
Kafka Connect
Kafka Connect since Kafka 2.6.0 comes with topic creation enabled:
topic.creation.enable = true
If you don’t want to allow automatic topic creation by connectors you can set this value to false
in the Kafka Connect config (connect-distributed.properties file or via environment variable CONNECT_TOPIC_CREATION_ENABLE when using Debezium’s container image for Kafka Connect).
Updating Connector Configuration
Kafka Connect topic creation works with groups. There’s always a default
group which is used when there’s no other group defined that matches the topic.
Every group can specify a collection of topic configuration properties, and a regular expression list of topic names that config should apply to.
You can specify all topic level configuration parameters to customize how the matched topics of the group will be created.
Let’s see how we can extend this Postgres config for the Kafka Connect topic creation:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": 1,
"database.hostname": "postgres",
"database.port": 5432,
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory"
}
}
Default Config
All topics not matching other topic.creation
groups will apply the default
group config.
As default we want replication.factor = 3
, partitions = 10
, the topic should be key compacted with cleanup.policy = "compact"
, and all messages should be LZ4 compressed on harddisk with compression.type = "lz4"
.
So we configure for the default group:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": 1,
"database.hostname": "postgres",
"database.port": 5432,
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 10,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4"
}
}
Productlog Config
In the databases inventory
schema there are tables starting with product
as table name.
As default the fully qualified table names are captured to the topic with the same name with Debezium, for example the table products
in the inventory
schema of dbserver1
is captured to the topic dbserver1.inventory.products
.
We want that all messages that go to a topic for table names starting with product
are stored in a topic with a retention time of 3 months / 90 days with cleanup.policy": "delete"
and retention.ms = 7776000000
, replication.factor = 1
, partitions = 20
, and just use the compression format that’s used by the producer compression.type": "producer"
.
You can leave out properties that match the cluster defaults, but be careful once you change the default config on your Kafka brokers the resulting topic config might differ!
First we need to register a productlog
group using the topic.creation.groups
property.
Then we can define what topic names should be included in that group and specify the configuration of our group like we did with the default
group:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": 1,
"database.hostname": "postgres",
"database.port": 5432,
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 10,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4",
"topic.creation.groups": "productlog", (1)
"topic.creation.productlog.include": "dbserver1\\.inventory\\.product.*", (2)
"topic.creation.productlog.replication.factor": 1,
"topic.creation.productlog.partitions": 20,
"topic.creation.productlog.cleanup.policy": "delete",
"topic.creation.productlog.retention.ms": 7776000000,
"topic.creation.productlog.compression.type": "producer"
}
}
Item | Description |
---|---|
1 |
|
2 | The |
Exploring the Results
When we now start our connector and use kafka-topics.sh
to see how the topics were created, we can see that all worked as defined:
## the `dbserver1.inventory.products` topic has the config from the `productlog` group:
$ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --describe --topic dbserver1.inventory.products
Topic: dbserver1.inventory.products PartitionCount: 20 ReplicationFactor: 1
Configs: compression.type=producer,cleanup.policy=delete,retention.ms=7776000000,segment.bytes=1073741824
## the `dbserver1.inventory.orders` topic has the config from the `default` group:
$ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --describe --topic dbserver1.inventory.orders
Topic: dbserver1.inventory.orders PartitionCount: 10 ReplicationFactor: 3
Configs: compression.type=lz4,cleanup.policy=compact,segment.bytes=1073741824,delete.retention.ms=2592000000
Conclusion
In many, especially in production environments we often don’t want topic auto creation to be enabled on the Kafka broker side, or we need a different configuration than the default topic config.
Prior Kafka 2.6 this was only possible when manually creating topics upfront or by some custom setup process, maybe during deployment.
Since Kafka 2.6 Kafka Connect comes with built-in topic creation for connector topics and this article shows how to use it with Debezium.
You can find an example here in the Debezium examples repository on GitHub.
About Debezium
Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.
Get involved
We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.