Content-Based Routing

This SMT is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems while using this routing transformation.

With real-life applications, it is often necessary to route the events not only to a static topic but to a topic based on the message content. This is expressed as the Content-Based Router integration pattern. Kafka Connect provides a generic mechanism to do the routing in the form of Simple Message Transforms (SMT).

The SMT is a Java class that encodes the routing logic. This is a very powerful mechanism but has two drawbacks:

  • It is necessary to compile the transformation upfront and deploy it to Kafka Connect.

  • Every change needs code recompilation and redeployment, leading to inflexible operations.

To solve this problem, Debezium comes with the Content-Based Router SMT. This SMT allows the operator to write an expression that is evaluated for each event and according to the result, it is either kept in the original topic or re-routed to another one. The current implementation supports any scripting language which integrates with JSR 223 (Scripting for the Java™ Platform).

The Content-Based Router SMT can be used like so:

...
transforms=route
transforms.route.type=io.debezium.transforms.ContentBasedRouter
transforms.route.language=jsr223.groovy
transforms.route.topic.expression=value.op == 'u' ? 'updates' : null
...

In this example, we are using Groovy as the expression language, and we’re re-routing all update records to the updates topic and the others into the default one.

Debezium does not come with the language implementations in its installation packages. It is the user’s responsibility to provide an implementation, such as Groovy 3 or GraalVM JavaScript, on the classpath. Bootstrapping is done exclusively via the JSR 223 API currently, so the engine’s support for this API must be provided as well.

Debezium binds four variables into the evaluation context:

  • key - a key of the message

  • value - a value of the message

  • keySchema - the schema of the message key

  • valueSchema - the schema of the message value

  • topic - the name of the target topic

  • headers - the map of message headers keyed with header name and value composed of schema and value variables

The key and value are of type org.apache.kafka.connect.data.Struct and keySchema and valueSchema are variables of type org.apache.kafka.connect.data.Schema. The expression can invoke arbitrary methods on the variables and should evaluate into a boolean value that decides whether the message is removed true or kept. Expressions should be side-effect free, i.e. they should not modify the passed variables in any way.

Language specifics

The same business logic - re-route all update records an be expressed like this, depending on your preferred scripting language; In case of Groovy, the value fields can be accessed in a property-like way:

value.op == 'u' ? 'updates' : null

The Debezium MongoDB connector emits the after and patch fields not as structures but as serialized JSON documents. This means that when you are using the SMT with the MongoDB connector you should either first apply `ExtractNewDocumentState) SMT to unwind the field or use a JSON parser in the expression.

In case of Groovy it means adding groovy-json artifact on the classpath and write the expression like (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'.

Other languages, such as JavaScript, will typically require to call the Struct#get() method:

value.get('op') == 'u' ? 'updates' : null

When using JavaScript via Graal.js, simplified property references can be used, akin to the Groovy approach:

value.op == 'u' ? 'updates' : null

Configuration options

Property

Default

Description

An optional regular expression for specifying the topic(s) this transformation should be applied to. Records on any topic whose name does not match the given expression are passed on as-is.

The language in which the expression is written. Must begin with jsr223., e.g. jsr223.groovy, or jsr223.graal.js. Currently, only bootstrapping via the JSR 223 API ("Scripting for the Java ™ Platform") is supported.

The expression evaluated for every message. Must evaluate to a String value where a result of non-null will re-route the message to a new topic, and a null value will keep it in the original topic.

keep

Prescribes how the transformation should handle null (tombstone) messages. The options are: keep (the default) to pass the message through, drop to remove the messages completely, or evaluate to run the message through the topic expression.