Customization of Kafka Connect automatic topic creation
Kafka provides two mechanisms for creating topics automatically.
You can enable automatic topic creation for the Kafka broker, and, beginning with Kafka 2.6.0, you can also enable Kafka Connect to create topics.
The Kafka broker uses the auto.create.topics.enable
property to control automatic topic creation.
In Kafka Connect, the topic.creation.enable
property specifies whether Kafka Connect is permitted to create topics.
In both cases, the default settings for the properties enables automatic topic creation.
When automatic topic creation is enabled, if a Debezium source connector emits a change event record for a table for which no target topic already exists, the topic is created at runtime as the event record is ingested into Kafka.
Topics that the broker creates are limited to sharing a single default configuration. The broker cannot apply unique configurations to different topics or sets of topics. By contrast, Kafka Connect can apply any of several configurations when creating topics, setting the replication factor, number of partitions, and other topic-specific settings as specified in the Debezium connector configuration. The connector configuration defines a set of topic creation groups, and associates a set of topic configuration properties with each group.
The broker configuration and the Kafka Connect configuration are independent of each other. Kafka Connect can create topics regardless of whether you disable topic creation at the broker. If you enable automatic topic creation at both the broker and in Kafka Connect, the Connect configuration takes precedence, and the broker creates topics only if none of the settings in the Kafka Connect configuration apply.
Disabling automatic topic creation for the Kafka broker
By default, the Kafka broker configuration enables the broker to create topics at runtime if the topics do not already exist. Topics created by the broker cannot be configured with custom properties. If you use a Kafka version earlier than 2.6.0, and you want to create topics with specific configurations, you must to disable automatic topic creation at the broker, and then explicitly create the topics, either manually, or through a custom deployment process.
-
In the broker configuration, set the value of
auto.create.topics.enable
tofalse
.
Set up Kafka Connect
Automatic topic creation in Kafka Connect is controlled by the topic.creation.enable
property.
The default value for the property is true
, enabling automatic topic creation, as shown in the following example:
topic.creation.enable = true
The setting for the topic.creation.enable
property applies to all workers in the Connect cluster.
Kafka Connect automatic topic creation requires you to define the configuration properties that Kafka Connect applies when creating topics. You specify topic configuration properties in the Debezium connector configuration by defining topic groups, and then specifying the properties to apply to each group. The connector configuration defines a default topic creation group, and, optionally, one or more custom topic creation groups. Custom topic creation groups use lists of topic name patterns to specify the topics to which the group’s settings apply.
For details about how Kafka Connect matches topics to topic creation groups, see Topic creation groups. For more information about how configuration properties are assigned to groups, see Topic creation group configuration properties.
By default, topics that Kafka Connect creates are named based on the pattern server.schema.table
, for example, dbserver.myschema.inventory
.
If you don’t want to allow automatic topic creation by Kafka Connect, set the value of |
Kafka Connect automatic topic creation requires the |
Configuration
For Kafka Connect to create topics automatically, it requires information from the source connector about the configuration properties to apply when creating topics. You define the properties that control topic creation in the configuration for each Debezium connector. As Kafka Connect creates topics for event records that a connector emits, the resulting topics obtain their configuration from the applicable group. The configuration applies to event records emitted by that connector only.
Topic creation groups
A set of topic properties is associated with a topic creation group.
Minimally, you must define a default
topic creation group and specify its configuration properties.
Beyond that you can optionally define one or more custom topic creation groups and specify unique properties for each.
When you create custom topic creation groups, you define the member topics for each group based on topic name patterns.
You can specify naming patterns that describe the topics to include or exclude from each group.
The include
and exclude
properties contain comma-separated lists of regular expressions that define topic name patterns.
For example, if you want a group to include all topics that start with the string dbserver1.inventory
, set the value of its topic.creation.inventory.include
property to dbserver1\\.inventory\\.*
.
If you specify both |
Topic creation group configuration properties
The default
topic creation group and each custom group is associated with a unique set of configuration properties.
You can configure a group to include any of the Kafka topic-level configuration properties.
For example, you can specify the cleanup policy for old topic segments, retention time, or the topic compression type for a topic group.
You must define at least a minimum set of properties to describe the configuration of the topics to be created.
If no custom groups are registered, or if the include
patterns for any registered groups don’t match the names of any topics to be created,
then Kafka Connect uses the configuration of the default
group to create topics.
See Configuring Debezium topics in the Debezium installation guide on generic topic configuration considerations.
Default group configuration
Before you can use Kafka Connect automatic topic creation, you must create a default topic creation group and define a configuration for it.
The configuration for the default topic creation group is applied to any topics with names that do not match the include
list pattern of a custom topic creation group.
-
To define properties for the
topic.creation.default
group, add them to the connector configuration JSON, as shown in the following example:{ ... "topic.creation.default.replication.factor": 3, (1) "topic.creation.default.partitions": 10, (2) "topic.creation.default.cleanup.policy": "compact", (3) "topic.creation.default.compression.type": "lz4" (4) ... }
You can include any Kafka topic-level configuration property in the configuration for the
default
group.
Item | Description |
---|---|
1 |
|
2 |
|
3 |
|
4 |
|
Custom groups fall back to the |
Custom group configuration
You can define multiple custom topic groups, each with its own configuration.
-
To define a custom topic group, add a
topic.creation.<group_name>.include
property to the connector JSON, and list the properties for the custom group after the group name.The following example shows sample configurations for the
inventory
andapplicationlogs
custom topic creation groups:{ ... (1) "topic.creation.inventory.include": "dbserver1\\.inventory\\.*", (2) "topic.creation.inventory.partitions": 20, "topic.creation.inventory.cleanup.policy": "compact", "topic.creation.inventory.delete.retention.ms": 7776000000, (3) "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*", (4) "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*", (5) "topic.creation.applicationlogs.replication.factor": 1, "topic.creation.applicationlogs.partitions": 20, "topic.creation.applicationlogs.cleanup.policy": "delete", "topic.creation.applicationlogs.retention.ms": 7776000000, "topic.creation.applicationlogs.compression.type": "lz4", ... }
Item | Description |
---|---|
1 |
Defines the configuration for the |
2 |
|
3 |
Defines the configuration for the |
4 |
|
5 |
|
Registering custom groups
After you specify the configuration for any custom topic creation groups, register the groups.
-
Register custom groups by adding the
topic.creation.groups
property to the connector JSON, and specifying a comma-separated list of groups.The following example registers the custom topic creation groups
inventory
andapplicationlogs
:{ ... "topic.creation.groups": "inventory,applicationlogs", ... }
The following example shows a completed configuration that includes the configuration for a default
topic group,
along with the configurations for an inventory
and an applicationlogs
custom topic creation group:
{
...
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 10,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4",
"topic.creation.groups": "inventory,applicationlogs",
"topic.creation.inventory.include": "dbserver1\\.inventory\\.*",
"topic.creation.inventory.partitions": 20,
"topic.creation.inventory.cleanup.policy": "compact",
"topic.creation.inventory.delete.retention.ms": 7776000000,
"topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*",
"topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*",
"topic.creation.applicationlogs.replication.factor": 1,
"topic.creation.applicationlogs.partitions": 20,
"topic.creation.applicationlogs.cleanup.policy": "delete",
"topic.creation.applicationlogs.retention.ms": 7776000000,
"topic.creation.applicationlogs.compression.type": "lz4"
}
Additional resources
For more information on topic auto-creation you can have a look at these resources:
-
Debezium Blog: Auto-creating Debezium Change Data Topics
-
Kafka Improvement Proposal about adding topic auto-creation to Kafka Connect: KIP-158 Kafka Connect should allow source connectors to set topic-specific settings for new topics