You are viewing documentation for an outdated version of Debezium.
If you want to view the latest stable version of this page, please go here.

Partition Routing

By default, when Debezium detects a change in a data collection, the change event that it emits is sent to a topic that uses a single Apache Kafka partition. As described in Customization of Kafka Connect automatic topic creation, you can customize the default configuration to route events to multiple partitions, based on a hash of the primary key.

However, in some cases, you might also want Debezium to route events to a specific topic partition. The partition routing SMT enables you to route events to specific destination partitions based on the values of one or more specified payload fields. To calculate the destination partition, Debezium uses a hash of the specified field values.

Example: Basic configuration

You configure the partition routing transformation in the Debezium connector’s Kafka Connect configuration. The configuration specifies the following parameters:

partition.payload.field

Specifies the fields in the event payload that the SMT uses to calculate the destination partition. You can use dot notation to specify nested payload fields.

partition.topic.num

Specifies the number of partitions in the destination topic.

partition.hash.function

Specifies hash function to be used hash of the fields which would determine number of the destination partition.

By default, Debezium routes all change event records for a configured data collection to a single Apache Kafka topic. Connectors do not direct event records to specific partitions in the topic.

To configure a Debezium connector to route events to a specific partition, configure the PartitionRouting SMT in the Kafka Connect configuration for the Debezium connector.

For example, you might add the following configuration in your connector configuration.

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...

Based on the preceding configuration, whenever the SMT receives a message that is bound for a topic with a name that begin with the prefix, fulfillment, it redirects the message to a specific topic partition.

The SMT computes the target partition from a hash of the value of the name field in the message payload. By specifying the`allTopic` predicate, the configuration selectively applies the SMT. The change prefix is a special keyword that enables the SMT to automatically refer to elements in the payload that describe the before or after states of the data. If a specified field is not present in the event message, the SMT ignores it. If none of the fields exist in the message, then the transformation ignores the event message entirely, and delivers the original version of the message to the default destination topic. The number of partitions specified by the topic.num setting in the SMT configuration must match the number of partitions specified by the Kafka Connect configuration. For example, in the preceding configuration example, the value specified by the Kafka Connect property topic.creation.default.partitions matches the topic.num value in the SMT configuration.

Given this Products table

Table 1. Products table

id

name

description

weight

101

scooter

Small 2-wheel scooter

3.14

102

car battery

12V car battery

8.1

103

12-pack drill bits

12-pack of drill bits with sizes ranging from #40 to #3

0.8

104

hammer

12oz carpenter’s hammer

0.75

105

hammer

14oz carpenter’s hammer

0.875

106

hammer

16oz carpenter’s hammer

1.0

107

rocks

box of assorted rocks

5.3

108

jacket

water resistent black wind breaker

0.1

109

spare tire

24 inch spare tire

22.2

Based on the configuration, the SMT routes change events for the records that have the field name hammer to the same partition. That is, the items with id values 104, 105, and 106 are routed to the same partition.

Example: Advanced configuration

Suppose that you want to route events from two data collections (t1, t2) to the same topic (for example, my_topic), and you want to partition events from data collection t1 by using field f1, and partition events from data collection t2 by using field f2.

You could apply the following configuration:

transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.f1,change.f2
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=myTopic

predicates=myTopic
predicates.myTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.myTopic.pattern=my_topic

The preceding configuration does not specify how to re-route events so that they are sent to a specific destination topic. For information about how to send events to topics other than their default destination topics, see the Topic Routing SMT., see the Topic Routing SMT.

Migrating from the Debezium ComputePartition SMT

The Debezium ComputePartition SMT has been discontinued. The information in the following section describes how migrate from the ComputePartition SMT to the new PartitionRouting SMT.

Assuming that the configuration sets the same number of partitions for all topics, replace the following ComputePartition`configuration with the `PartitionRouting SMT. The following examples provide a comparison of the two configuration.

Example: Legacy ComputePartition configuration
...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:2,inventory.orders:2
...

Replace the preceding ComputePartition with the following PartitionRouting configuration. Example: PartitionRouting configuration that replaces the earlier ComputePartition configuration

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name,change.purchaser
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...

If the SMT emits events to topics that do not share the same number of partitions, you must specify unique partition.num.mappings values for each topic. For example, in the following example, the topic for the legacy products collection is configured with 3 partitions, and the topic for the orders data collection is configured with 2 partitions:

Example: Legacy ComputePartition configuration that sets unique partition values for different topics
...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:3,inventory.orders:2
...

Replace the preceding ComputePartition configuration with the following PartitionRouting configuration: .PartitionRouting configuration that sets unique partition.topic.num values for different topics

...
topic.prefix=fulfillment

transforms=ProductsPartitionRouting,OrdersPartitionRouting
transforms.ProductsPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.ProductsPartitionRouting.partition.payload.fields=change.name
transforms.ProductsPartitionRouting.partition.topic.num=3
transforms.ProductsPartitionRouting.predicate=products

transforms.OrdersPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.OrdersPartitionRouting.partition.payload.fields=change.purchaser
transforms.OrdersPartitionRouting.partition.topic.num=2
transforms.OrdersPartitionRouting.predicate=products

predicates=products,orders
predicates.products.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.products.pattern=fulfillment.inventory.products
predicates.orders.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.orders.pattern=fulfillment.inventory.orders
...

Configuration options

The following table lists the configuration options that you can set for the partition routing SMT.

Table 2. Partition routing SMT (PartitionRouting) configuration options

Property

Default

Description

Specifies the fields in the event payload that the SMT uses to calculate the target partition. Use dot notation if you want the SMT to add fields from the original payload to specific levels in the output data structure. To access fields related to data collections, you can use: after, before, or change. The 'change' field is a special field that results in the SMT automatically populating content in the 'after' or 'before' elements, depending on type of operation. If a specified field is not present in a record, the SMT skips it. For example, after.name,source.table,change.name

The number of partitions for the topic on which this SMT acts. Use the TopicNameMatches predicate to filter records by topic.

java

Hash function to be used when computing hash of the fields which would determine number of the destination partition. Possible values are:

java - standard Java Object::hashCode function

murmur - latest version of MurmurHash function, MurmurHash3

This configuration is optional. If not specified or invalid value is used, the default value will be used.