Message Filtering

This SMT is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems while using this filter transformation.

With real-life applications, it is often necessary to deliver only a subset of events into the Kafka broker. The user might need to filter the events according to different business rules. Kafka Connect provides a generic mechanism to do the filtering in the form of Simple Message Transforms (SMT).

The SMT is a Java class that encodes the filtering logic. This is a very powerful mechanism but has two drawbacks:

  • It is necessary to compile the transformation upfront and deploy it to Kafka Connect.

  • Every change needs code recompilation and redeployment, leading to inflexible operations.

To solve this problem, Debezium comes with the Filtering SMT. This SMT allows the operator to write an expression that is evaluated for each event and according to the result, it is either filtered out or passed on for further processing. The current implementation supports any scripting language which integrates with JSR 223 (Scripting for the Java™ Platform).

The Filtering SMT can be used like so:

...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.op == 'u' && value.before.id == 2
...

In this example we are using Groovy as expression language, and we’re filtering out all update records with id field equal to 2.

Debezium does not come with the language implementations in its installation packages. It is the user’s responsibility to provide an implementation, such as Groovy 3 or GraalVM JavaScript, on the classpath. Bootstrapping is done exclusively via the JSR 223 API currently, so the engine’s support for this API must be provided as well.

Debezium binds four variables into the evaluation context:

  • key - a key of the message

  • value - a value of the message

  • keySchema - the schema of the message key

  • valueSchema - the schema of the message value

  • topic - the name of the target topic

  • headers - the map of message headers keyed with header name and value composed of schema and value variables

The key and value are of type org.apache.kafka.connect.data.Struct and keySchema and valueSchema are variables of type org.apache.kafka.connect.data.Schema. The expression can invoke arbitrary methods on the variables and should evaluate into a boolean value that decides whether the message is removed true or kept. Expressions should be side-effect free, i.e. they should not modify the passed variables in any way.

Language specifics

The same business logic - remove all update records with id set to 2 can be expressed like this, depending on your preferred scripting language; In case of Groovy, the value fields can be accessed in a property-like way:

value.op == 'u' && value.before.id == 2

The Debezium MongoDB connector emits the after and patch fields not as structures but as serialized JSON documents. This means that when you are using the SMT with the MongoDB connector you should either first apply `ExtractNewDocumentState) SMT to unwind the field or use a JSON parser in the expression.

In case of Groovy it means adding groovy-json artifact on the classpath and write the expression like (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'.

Other languages, such as JavaScript, will typically require to call the Struct#get() method:

value.get('op') == 'u' && value.get('before').get('id') == 2

When using JavaScript via Graal.js, simplified property references can be used, akin to the Groovy approach:

value.op == 'u' && value.before.id == 2

Configuration options

Property

Default

Description

An optional regular expression for specifying the topic(s) this transformation should be applied to. Records on any topic whose name does not match the given expression are passed on as-is.

The language in which the expression is written. Must begin with jsr223., e.g. jsr223.groovy, or jsr223.graal.js. Currently, only bootstrapping via the JSR 223 API ("Scripting for the Java ™ Platform") is supported.

The expression evaluated for every message. Must evaluate to a boolean value where a result of true will keep the message, and a result of false will remove it.

keep

Prescribes how the transformation should handle null (tombstone) messages. The options are: keep (the default) to pass the message through, drop to remove the messages completely or evaluate to run the message through the condition expression.