Event Record Changes
This single message transformation (SMT) is supported for only the SQL database connectors. |
A Debezium data change event has a complex structure that provides a wealth of information.
However, in some cases, before a downstream consumer can process Debezium change event messages, it requires additional information about field-level changes that result from the original database change.
To enhance event messages with details about how a database operation modifies fields in the source database, Debezium provides the ExtractChangedRecordState
single message transformation (SMT).
The event changes transformation is a Kafka Connect SMT.
Change event structure
Debezium generates data change events that have a complex structure. Each event consists of the following parts:
-
Metadata, which includes but is not limited to the following types:
-
The type of operation that changed the data.
-
Source information, such as the names of the database and the table in which the change occurred.
-
Timestamp that identifies when the change was made.
-
Optional transaction information.
-
-
Row data before a change.
-
Row data after a change.
The following example shows part of the structure of a typical Debezium UPDATE
change event:
{
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"ts_us" : "...",
"ts_ns" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
More details about change event structure are provided in the documentation for each connector.
The complex format of the message in the preceding example provides detailed information about changes that occur in the source database.
However, the format might not be suitable for some downstream consumers.
Sink connectors, or other parts of the Kafka ecosystem might expect the message to explicitly identify the fields that a database operation changes or leaves unchanged.
The ExtractChangedRecordState
SMT adds headers to the change event message to identify the fields that are modified by a database operation, and the fields that remain unchanged.
Behavior
The event changes SMT extracts the before
and after
fields from a Debezium UPDATE
change event in a Kafka record.
The transformation examines the before
and after
event state structures to identify the fields that are altered by an operation, and those that remain unchanged.
Depending on the connector configuration, the transformation then produces a modified event message that adds message headers to list the changed fields, the unchanged fields, or both.
If the event represents an INSERT
or DELETE
, this single message transformation has no effect.
You can configure the event changes SMT for a Debezium connector, or for a sink connector that consumes messages emitted by a Debezium connector. Configure the event changes SMT for a sink connector if you want Apache Kafka to retain the entire original Debezium change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.
Depending on your use case, you can configure the transformation to modify the original message by performing one or both of the following tasks:
-
Identify the fields that are changed by an
UPDATE
event by listing them in the user-configuredheader.changed.name
header. -
Identify the fields that are not changed by an
UPDATE
event by listing them in the user-configuredheader.unchanged.name
header.
Configuration
You configure the Debezium event changes SMT for a Kafka Connect source or sink connector by adding the SMT configuration details to your connector’s configuration. To obtain the default behavior, which doesn’t add any headers, add the transformation to the connector configuration, as in the following example:
transforms=changes,...
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
As with any Kafka Connect connector configuration, you can set transforms=
to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.
The connector configuration in the following example sets several options for the event changes SMT:
transforms=changes,...
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.changes.header.changed.name=Changed
transforms.changes.header.unchanged.name=Unchanged
header.changed.name
-
The Kafka message header name to use for storing a comma-separated list of the fields that are changed by a database operation.
header.unchanged.name
-
The Kafka message header name to use for storing a comma-separated list of the fields that remain unchanged after a database operation.
The connector might emit many types of event messages (heartbeat messages, tombstone messages, or metadata messages about transactions or schema changes). To apply the transformation to a subset of events, you can define an SMT predicate statement that selectively applies the transformation to specific events only.
Options for applying the event changes transformation selectively
In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.
For more information about how to apply the SMT selectively, see Configure an SMT predicate for the transformation.
Configuration options
The following table describes the options that you can specify to configure the event changes SMT.
Option | Default | Description |
---|---|---|
The Kafka message header name to use for storing a comma-separated list of the fields that are changed by a database operation. |
||
The Kafka message header name to use for storing a comma-separated list of the fields that remain unchanged after a database operation. |