Create new topics / pipes

When you are working with Kafka Connect Distributed then you might have realized that once you start Kafka Connect there are already some internal Kafka Connect related topics created for you:

$ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --list

connect_configs
connect_offsets
connect_statuses

This is done automatically for you by Kafka Connect with a sane, customized default topic configuration that fits the needs of these internal topics.

When you start a Debezium connector the topics for the captured events are created by the Kafka broker based on a default, maybe customized, configuration in the broker if auto.create.topics.enable = true is enabled in the broker config:

auto.create.topics.enable = true
default.replication.factor = 1
num.partitions = 1
compression.type = producer
log.cleanup.policy = delete
log.retention.ms = 604800000  ## 7 days

But often, when you use Debezium and Kafka in a production environment you might choose to disable Kafka’s topic auto creation capability with auto.create.topics.enable = false, or you want the connector topics to be configured differently from the default. In this case you have to create topics for Debezium’s captured data sources upfront.
But there’s good news! Beginning with Kafka Connect version 2.6.0, this can be automated since KIP-158 is implemented to enable customizable topic creation with Kafka Connect.

Kafka Connect

Kafka Connect since Kafka 2.6.0 comes with topic creation enabled:

topic.creation.enable = true

If you don’t want to allow automatic topic creation by connectors you can set this value to false in the Kafka Connect config (connect-distributed.properties file or via environment variable CONNECT_TOPIC_CREATION_ENABLE when using Debezium’s container image for Kafka Connect).

Updating Connector Configuration

Kafka Connect topic creation works with groups. There’s always a default group which is used when there’s no other group defined that matches the topic.

Every group can specify a collection of topic configuration properties, and a regular expression list of topic names that config should apply to.

You can specify all topic level configuration parameters to customize how the matched topics of the group will be created.

Let’s see how we can extend this Postgres config for the Kafka Connect topic creation:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": 1,
        "database.hostname": "postgres",
        "database.port": 5432,
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "dbserver1",
        "schema.include.list": "inventory"
    }
}

Default Config

All topics not matching other topic.creation groups will apply the default group config.
As default we want replication.factor = 3, partitions = 10, the topic should be key compacted with cleanup.policy = "compact", and all messages should be LZ4 compressed on harddisk with compression.type = "lz4".
So we configure for the default group:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": 1,
        "database.hostname": "postgres",
        "database.port": 5432,
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "dbserver1",
        "schema.include.list": "inventory",

        "topic.creation.default.replication.factor": 3,
        "topic.creation.default.partitions": 10,
        "topic.creation.default.cleanup.policy": "compact",
        "topic.creation.default.compression.type": "lz4"
    }
}

Productlog Config

In the databases inventory schema there are tables starting with product as table name.
As default the fully qualified table names are captured to the topic with the same name with Debezium, for example the table products in the inventory schema of dbserver1 is captured to the topic dbserver1.inventory.products.

We want that all messages that go to a topic for table names starting with product are stored in a topic with a retention time of 3 months / 90 days with cleanup.policy": "delete" and retention.ms = 7776000000, replication.factor = 1, partitions = 20, and just use the compression format that’s used by the producer compression.type": "producer".
You can leave out properties that match the cluster defaults, but be careful once you change the default config on your Kafka brokers the resulting topic config might differ!

First we need to register a productlog group using the topic.creation.groups property.
Then we can define what topic names should be included in that group and specify the configuration of our group like we did with the default group:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": 1,
        "database.hostname": "postgres",
        "database.port": 5432,
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "dbserver1",
        "schema.include.list": "inventory",
        "topic.creation.default.replication.factor": 3,
        "topic.creation.default.partitions": 10,
        "topic.creation.default.cleanup.policy": "compact",
        "topic.creation.default.compression.type": "lz4",

        "topic.creation.groups": "productlog",  (1)

        "topic.creation.productlog.include": "dbserver1\\.inventory\\.product.*",  (2)
        "topic.creation.productlog.replication.factor": 1,
        "topic.creation.productlog.partitions": 20,
        "topic.creation.productlog.cleanup.policy": "delete",
        "topic.creation.productlog.retention.ms": 7776000000,
        "topic.creation.productlog.compression.type": "producer"
    }
}
Table 1. Connector Configuration for customized automatic topic creation
Item Description

1

topic.creation.groups defines a comma-separated list of additional group names. Here we only define our productlog group.

2

The topic.creation.productlog.include field holds a comma-separated list of regular expressions that match the topic names where the productlog group config should be applied. The productlog group matches all topics starting with dbserver1.inventory.product.

Exploring the Results

When we now start our connector and use kafka-topics.sh to see how the topics were created, we can see that all worked as defined:

## the `dbserver1.inventory.products` topic has the config from the `productlog` group:
$ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --describe --topic dbserver1.inventory.products

Topic: dbserver1.inventory.products     PartitionCount: 20      ReplicationFactor: 1
Configs: compression.type=producer,cleanup.policy=delete,retention.ms=7776000000,segment.bytes=1073741824

## the `dbserver1.inventory.orders` topic has the config from the `default` group:
$ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --describe --topic dbserver1.inventory.orders

Topic: dbserver1.inventory.orders       PartitionCount: 10       ReplicationFactor: 3
Configs: compression.type=lz4,cleanup.policy=compact,segment.bytes=1073741824,delete.retention.ms=2592000000

Conclusion

In many, especially in production environments we often don’t want topic auto creation to be enabled on the Kafka broker side, or we need a different configuration than the default topic config.
Prior Kafka 2.6 this was only possible when manually creating topics upfront or by some custom setup process, maybe during deployment.

Since Kafka 2.6 Kafka Connect comes with built-in topic creation for connector topics and this article shows how to use it with Debezium.

You can find an example here in the Debezium examples repository on GitHub.

René Kerner

René is a software engineer at Red Hat. Before he was working as software architect and engineer at trivago and as consultant at Codecentric. Now he's part of Debezium team. He lives in Mönchengladbach, Germany.

   


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.