Partition Routing
By default, when Debezium detects a change in a data collection, the change event that it emits is sent to a topic that uses a single Apache Kafka partition. As described in Customization of Kafka Connect automatic topic creation, you can customize the default configuration to route events to multiple partitions, based on a hash of the primary key.
However, in some cases, you might also want Debezium to route events to a specific topic partition. The partition routing SMT enables you to route events to specific destination partitions based on the values of one or more specified payload fields. To calculate the destination partition, Debezium uses a hash of the specified field values.
Example: Basic configuration
You configure the partition routing transformation in the Debezium connector’s Kafka Connect configuration. The configuration specifies the following parameters:
partition.payload.fields
-
Specifies the fields in the event payload that the SMT uses to calculate the destination partition. You can use dot notation to specify nested payload fields.
partition.topic.num
-
Specifies the number of partitions in the destination topic.
partition.hash.function
-
Specifies hash function to be used hash of the fields which would determine number of the destination partition.
By default, Debezium routes all change event records for a configured data collection to a single Apache Kafka topic. Connectors do not direct event records to specific partitions in the topic.
To configure a Debezium connector to route events to a specific partition, configure the PartitionRouting
SMT in the Kafka Connect configuration for the Debezium connector.
For example, you might add the following configuration in your connector configuration.
...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...
topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...
Based on the preceding configuration, whenever the SMT receives a message that is bound for a topic with a name that begin with the prefix, fulfillment
, it redirects the message to a specific topic partition.
The SMT computes the target partition from a hash of the value of the name
field in the message payload.
By specifying the`allTopic` predicate, the configuration selectively applies the SMT.
The change
prefix is a special keyword that enables the SMT to automatically refer to elements in the payload that describe the before
or after
states of the data.
If a specified field is not present in the event message, the SMT ignores it.
If none of the fields exist in the message, then the transformation ignores the event message entirely, and delivers the original version of the message to the default destination topic.
The number of partitions specified by the topic.num
setting in the SMT configuration must match the number of partitions specified by the Kafka Connect configuration.
For example, in the preceding configuration example, the value specified by the Kafka Connect property topic.creation.default.partitions
matches the topic.num
value in the SMT configuration.
Given this Products
table
id |
name |
description |
weight |
101 |
scooter |
Small 2-wheel scooter |
3.14 |
102 |
car battery |
12V car battery |
8.1 |
103 |
12-pack drill bits |
12-pack of drill bits with sizes ranging from #40 to #3 |
0.8 |
104 |
hammer |
12oz carpenter’s hammer |
0.75 |
105 |
hammer |
14oz carpenter’s hammer |
0.875 |
106 |
hammer |
16oz carpenter’s hammer |
1.0 |
107 |
rocks |
box of assorted rocks |
5.3 |
108 |
jacket |
water resistent black wind breaker |
0.1 |
109 |
spare tire |
24 inch spare tire |
22.2 |
Based on the configuration, the SMT routes change events for the records that have the field name hammer
to the same partition.
That is, the items with id
values 104
, 105
, and 106
are routed to the same partition.
Example: Advanced configuration
Suppose that you want to route events from two data collections (t1, t2) to the same topic (for example, my_topic), and you want to partition events from data collection t1 by using field f1, and partition events from data collection t2 by using field f2.
You could apply the following configuration:
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.f1,change.f2
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=myTopic
predicates=myTopic
predicates.myTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.myTopic.pattern=my_topic
The preceding configuration does not specify how to re-route events so that they are sent to a specific destination topic. For information about how to send events to topics other than their default destination topics, see the Topic Routing SMT., see the Topic Routing SMT.
Migrating from the Debezium ComputePartition SMT
The Debezium ComputePartition
SMT has been discontinued.
The information in the following section describes how migrate from the ComputePartition
SMT to the new PartitionRouting
SMT.
Assuming that the configuration sets the same number of partitions for all topics, replace the following ComputePartition`configuration with the `PartitionRouting
SMT.
The following examples provide a comparison of the two configuration.
ComputePartition
configuration...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:2,inventory.orders:2
...
Replace the preceding ComputePartition
with the following PartitionRouting
configuration.
Example: PartitionRouting
configuration that replaces the earlier ComputePartition
configuration
...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...
topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name,change.purchaser
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...
If the SMT emits events to topics that do not share the same number of partitions, you must specify unique partition.num.mappings
values for each topic.
For example, in the following example, the topic for the legacy products
collection is configured with 3 partitions, and the topic for the orders
data collection is configured with 2 partitions:
ComputePartition
configuration that sets unique partition values for different topics...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:3,inventory.orders:2
...
Replace the preceding ComputePartition
configuration with the following PartitionRouting
configuration:
.PartitionRouting
configuration that sets unique partition.topic.num
values for different topics
...
topic.prefix=fulfillment
transforms=ProductsPartitionRouting,OrdersPartitionRouting
transforms.ProductsPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.ProductsPartitionRouting.partition.payload.fields=change.name
transforms.ProductsPartitionRouting.partition.topic.num=3
transforms.ProductsPartitionRouting.predicate=products
transforms.OrdersPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.OrdersPartitionRouting.partition.payload.fields=change.purchaser
transforms.OrdersPartitionRouting.partition.topic.num=2
transforms.OrdersPartitionRouting.predicate=products
predicates=products,orders
predicates.products.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.products.pattern=fulfillment.inventory.products
predicates.orders.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.orders.pattern=fulfillment.inventory.orders
...
Configuration options
The following table lists the configuration options that you can set for the partition routing SMT.
Property |
Default |
Description |
Specifies the fields in the event payload that the SMT uses to calculate the target partition.
Use dot notation if you want the SMT to add fields from the original payload to specific levels in the output data structure.
To access fields related to data collections, you can use: |
||
The number of partitions for the topic on which this SMT acts.
Use the |
||
|
Hash function to be used when computing hash of the fields which would determine number of the destination partition.
Possible values are: |