Applying transformations selectively

When you configure a single message transformation (SMT) for a connector, you can define a predicate for the transformation. The predicate specifies how to apply the transformation conditionally to a subset of the messages that the connector processes. You can assign predicates to transformations that you configure for source connectors, such as Debezium, or to sink connectors.

SMT predicates

Debezium provides several single message transformations (SMTs) that you can use to modify event records before Kafka Connect saves the records to Kafka topics. By default, when you configure one of these SMTs for a Debezium connector, Kafka Connect applies that transformation to every record that the connector emits. However, there might be instances in which you want to apply a transformation selectively, so that it modifies only that subset of change event messages that share a common characteristic.

For example, for a Debezium connector, you might want to run the transformation only on event messages from a specific table or that include a specific header key. In environments that run Apache Kafka 2.6 or greater, you can append a predicate statement to a transformation to instruct Kafka Connect to apply the SMT only to certain records. In the predicate, you specify a condition that Kafka Connect uses to evaluate each message that it processes. When a Debezium connector emits a change event message, Kafka Connect checks the message against the configured predicate condition. If the condition is true for the event message, Kafka Connect applies the transformation, and then writes the message to a Kafka topic. Messages that do not match the condition are sent to Kafka unmodified.

The situation is similar for predicates that you define for a sink connector SMT. The connector reads messages from a Kafka topic and Kafka Connect evaluates the messages against the predicate condition. If a message matches the condition, Kafka Connect applies the transformation and then passes the messages to the sink connector.

After you define a predicate, you can reuse it and apply it to multiple transforms. Predicates also include a negate option that you can use to invert a predicate so that the predicate condition is applied only to records that do not match the condition that is defined in the predicate statement. You can use the negate option to pair the predicate with other transforms that are based on negating the condition.

Predicate elements

Predicates include the following elements:

  • predicates prefix

  • Alias (for example, isOutboxTable)

  • Type (for example, org.apache.kafka.connect.transforms.predicates.TopicNameMatches). Kafka Connect provides a set of default predicate types, which you can supplement by defining your own custom predicates.

  • Condition statement and any additional configuration properties, depending on the type of predicate (for example, a regex naming pattern)

Default predicate types

The following predicate types are available by default:

HasHeaderKey

Specifies a key name in the header in the event message that you want Kafka Connect to evaluate. The predicate evaluates to true for any records that include a header key that has the specified name.

RecordIsTombstone

Matches Kafka tombstone records. The predicate evaluates to true for any record that has a null value. Use this predicate in combination with a filter SMT to remove tombstone records. This predicate has no configuration parameters.

A tombstone in Kafka is a record that has a key with a 0-byte, null payload. When a Debezium connector processes a delete operation in the source database, the connector emits two change events for the delete operation:

  • A delete operation ("op" : "d") event that provides the previous value of the database record.

  • A tombstone event that has the same key, but a null value.

    The tombstone represents a delete marker for the row. When log compaction is enabled for Kafka, during compaction Kafka removes all events that share the same key as the tombstone. Log compaction occurs periodically, with the compaction interval controlled by the delete.retention.ms setting for the topic.

    Although it is possible to configure Debezium so that it does not emit tombstone events, it’s best to permit Debezium to emit tombstones to maintain the expected behavior during log compaction. Suppressing tombstones prevents Kafka from removing records for a deleted key during log compaction. If your environment includes sink connectors that cannot process tombstones, you can configure the sink connector to use an SMT with the RecordIsTombstone predicate to filter out the tombstone records.

TopicNameMatches

A regular expression that specifies the name of a topic that you want Kafka Connect to match. The predicate is true for connector records in which the topic name matches the specified regular expression. Use this predicate to apply an SMT to records based on the name of the source table.

Defining SMT predicates

Configuring Kafka Connect predicates is similar to configuring transforms. You specify a predicate alias, associate the alias with a transform, and then define the type and configuration for the predicate.

Prerequisites
  • The Debezium environment runs Apache Kafka 2.6 or greater.

  • An SMT is configured for the Debezium connector.

Procedure
  1. In the Debezium connector configuration, specify a predicate alias for the predicates parameter, for example, IsOutboxTable.

  2. Associate the predicate alias with the transform that you want to apply conditionally, by appending the predicate alias to the transform alias in the connector configuration:

    transforms.<TRANSFORM_ALIAS>.predicate=<PREDICATE_ALIAS>

    For example:

    transforms.outbox.predicate=IsOutboxTable
  3. Configure the predicate by specifying its type and providing values for configuration parameters.

    1. For the type, specify one of the following default types that are available in Kafka Connect:

      • HasHeaderKey

      • RecordIsTombstone

      • TopicNameMatches

        For example:

        predicates.IsOutboxTable.type=org.apache.kafka.connect.predicates.TopicNameMatch
    2. For the TopicNameMatch or HasHeaderKey predicates, specify a regular expression for the topic or header name that you want to match.

      For example:

      predicates.IsOutboxTable.pattern=outbox.event.*
  4. If you want to negate a condition, append the negate keyword to the transform alias and set it to true.

    For example:

    transforms.outbox.negate=true

    The preceding property inverts the set of records that the predicate matches, so that Kafka Connect applies the transform to any record that does not match the condition specified in the predicate.

Example: TopicNameMatch predicate for the outbox event router transformation

The following example shows a Debezium connector configuration that applies the outbox event router transformation only to messages that Debezium emits to the Kafka outbox.event.order topic.

Because the TopicNameMatch predicate evaluates to true only for messages from the outbox table (outbox.event.*), the transformation is not applied to messages that originate from other tables in the database.

transforms=outbox
transforms.outbox.predicate=IsOutboxTable
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
predicates=IsOutboxTable
predicates.IsOutboxTable.type=org.apache.kafka.connect.predicates.TopicNameMatch
predicates.IsOutboxTable.pattern=outbox.event.*

Ignoring tombstone events

You can control whether Debezium emits tombstone events, and how long Kafka retains them. Depending on your data pipeline, you might want to set the tombstones.on.delete property for a connector so that Debezium does not emit tombstone events.

Whether you enable Debezium to emit tombstones depends on how topics are consumed in your environment and by the characteristics of the sink consumer. Some sink connectors rely on tombstone events to remove records from downstream data stores. In cases where sink connectors rely on tombstone records to indicate when to delete records in downstream data stores, configure Debezium to emit them.

When you configure Debezium to generate tombstones, further configuration is required to ensure that sink connectors receive the tombstone events. The retention policy for a topic must be set so that the connector has time to read event messages before Kafka removes them during log compaction. The length of time that a topic retains tombstones before compaction is controlled by the delete.retention.ms property for the topic.

By default, the tombstones.on.delete property for a connector is set to true so that the connector generates a tombstone after each delete event. If you set the property to false to prevent Debezium from saving tombstone records to Kafka topics, the absence of tombstone records might lead to unintended consequences. Kafka relies on tombstone during log compaction to remove records that are related to a deleted key.

If you need to support sink connectors or downstream Kafka consumers that cannot process records with null values, rather than preventing Debezium from emitting tombstones, consider configuring an SMT for the connector with a predicate that uses the RecordIsTombstone predicate type to remove tombstone messages before consumers read them.

Procedure
  • To prevent Debezium from emitting tombstone events for deleted database records, set the connector option tombstones.on.delete to false.

    For example:

    “tombstones.on.delete”: “false”