You are viewing documentation for the current development version of Debezium.
If you want to view the latest stable version of this page, please go here.

Customizing Topic Auto-Creation

Debezium automatically creates internal topics for offsets, connector status, config storage and history topics. The destination topics for the captured tables will be automatically created with a default config by the Kafka brokers when auto.create.topics.enable is set to true.
When topic creation is disabled on the brokers, for example in production environments, or when the topics need a different configuration then these topics have to be created upfront either automated in a custom deployment process or manually until Kafka Connect 2.6.

Since Kafka 2.6.0 Kafka Connect supports customizable topic auto-creation.

Set up Kafka Connect

Kafka Connect since Kafka 2.6.0 comes with topic creation enabled:

topic.creation.enable = true

If you don’t want to allow automatic topic creation by connectors you can set this value to false in the Kafka Connect config (connect-distributed.properties file or via environment variable CONNECT_TOPIC_CREATION_ENABLE when using Debezium’s container image for Kafka Connect).

Configuration

Topic auto-creation is based on groups. Every custom group has an include and an exclude property which are comma-separated lists of regular expressions matching topic names that should be included or excluded.

You can specify both, include and exclude parameters, but note that exclusion rules take precedence and override any inclusion rules for topics.

You don’t have to specify any custom group. When there’s no custom group registered or the registered group’s include patterns don’t match the topic which is to be created then the default config will be used.

You can specify all topic level configuration parameters to customize how topics will be created.

See Configuring Debezium Topics section in the Debezium installation guide on generic topic configuration considerations.

Default group configuration

The default config can be passed in the connector config JSON like:

{
    ...

    "topic.creation.default.replication.factor": 3,  (1)
    "topic.creation.default.partitions": 10,  (2)
    "topic.creation.default.cleanup.policy": "compact",  (3)
    "topic.creation.default.compression.type": "lz4"  (4)

     ...
}
Table 1. Connector configuration for the default topic creation group
Item Description

1

topic.creation.default.replication.factor defines the replication factor for topics created by the default group.
replication.factor is mandatory for the default group but optional for custom groups. Custom groups will fallback to the default group’s value if not set. Use -1 to use the Kafka broker’s default value.

2

topic.creation.default.partitions defines the number of partitions for topics created by the default group.
partitions is mandatory for the default group but optional for custom groups. Custom groups will fallback to the default group’s value if not set. Use -1 to use the Kafka broker’s default value.

3

topic.creation.default.cleanup.policy is mapped to the cleanup.policy property of the topic level configuration parameters and defines the log retention policy.

4

topic.creation.default.compression.type is mapped to the compression.type property of the topic level configuration parameters and defines how messages are compressed on harddisk.

As you can see, you can use every topic level configuration parameter as property.

Note that replication.factor and partitions properties are mandatory for the default group and optional for custom groups. Custom groups will fallback to the default group’s value if not set. Use -1 to use the Kafka broker’s default value.
This fallback does not apply to other config parameters.

Custom group configuration

You can specify multiple groups. Similar to the default group you group properties together by the group name. This will look like that in your connector JSON:

{
    ...

    (1)
    "topic.creation.inventory.include": "dbserver1\\.inventory\\.*",  (2)
    "topic.creation.inventory.partitions": 20,
    "topic.creation.inventory.cleanup.policy": "compact",
    "topic.creation.inventory.delete.retention.ms": 7776000000,

    (3)
    "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*",  (4)
    "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*",  (5)
    "topic.creation.applicationlogs.replication.factor": 1,
    "topic.creation.applicationlogs.partitions": 20,
    "topic.creation.applicationlogs.cleanup.policy": "delete",
    "topic.creation.applicationlogs.retention.ms": 7776000000,
    "topic.creation.applicationlogs.compression.type": "lz4",

     ...
}
Table 2. Connector configuration for custom inventory and applicationlogs topic creation groups
Item Description

1

First we define the configuration for the inventory group.
Note that replication.factor and partitions properties are optional for custom groups. Custom groups will fallback to the default group’s value if not set. Use -1 to use the Kafka broker’s default value.

2

topic.creation.inventory.include defines a regular expression to match all topics that start with dbserver1.inventory.. The config defined for the inventory group will only be applied when the topic name matches the given regular expression.

3

Then we define the configuration for the applicationlogs group.
Note that replication.factor and partitions properties are optional for custom groups. Custom groups will fallback to the default group’s value if not set. Use -1 to use the Kafka broker’s default value.

4

topic.creation.applicationlogs.include defines a regular expression to match all topics that start with dbserver1.logs.applog-. The config defined for the applicationlogs group will only be applied when the topic name matches the given regular expression. As there’s also the exclude property defined at position <5> all topics matching this include regular expression might be further restricted by the that exlude property.

5

topic.creation.applicationlogs.exclude defines a regular expression to match all topics that start with dbserver1.logs.applog-old-. The config defined for the applicationlogs group will only be applied when the topic name does not match the given regular expression. As there’s also the include property set for this group the applicationlogs group will only be applied to topics which name matches the include regular expression/s and not match the exclude regular expression/s.

Registering custom groups

Finally, we need to register the two defined custom groups inventory and applicationlogs with the topic.creation.groups property:

{
    ...

    "topic.creation.groups": "inventory,applicationlogs",

     ...
}

A complete connector JSON config will look like that:

{
    ...

    "topic.creation.default.replication.factor": 3,
    "topic.creation.default.partitions": 10,
    "topic.creation.default.cleanup.policy": "compact",
    "topic.creation.default.compression.type": "lz4"
    "topic.creation.groups": "inventory,applicationlogs",
    "topic.creation.inventory.include": "dbserver1\\.inventory\\.*",
    "topic.creation.inventory.replication.factor": 3,
    "topic.creation.inventory.partitions": 20,
    "topic.creation.inventory.cleanup.policy": "compact",
    "topic.creation.inventory.delete.retention.ms": 7776000000,
    "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*",
    "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*",
    "topic.creation.applicationlogs.replication.factor": 1,
    "topic.creation.applicationlogs.partitions": 20,
    "topic.creation.applicationlogs.cleanup.policy": "delete",
    "topic.creation.applicationlogs.retention.ms": 7776000000,
    "topic.creation.applicationlogs.compression.type": "lz4"
}

Additional resources

For more information on topic auto-creation you can have a look at these resources: