New Record State Extraction

This SMT is supported only for the SQL database connectors, it does not work with the MongoDB connector. See here for the MongoDB equivalent to this SMT.

Debezium generates a data change in a form of a complex message structure. The message consists of three parts:

  • operation and metadata

  • the row data before change

  • the row data after change

E.g. the general message structure for an update change looks like this:

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

More details about the message structure are provided in the documentation for each connector.

This format allows the user to get most information about changes happening in the system. The downside of using the complex format is that other connectors or other parts of the Kafka ecosystem usually expect the data in a simple message format that can generally be described like so:

{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}

Debezium provides a single message transformation that crosses the bridge between the complex and simple formats, the UnwrapFromEnvelope SMT.

The SMT provides three main functions. It

  • extracts the after field from change events and replaces the original event just with this part

  • optionally filters delete and tombstone records, as per the capabilities and requirements of downstream consumers

  • optionally adds metadata fields from the change event’s source field, prefixing them with "__"

The SMT can be applied either to a source connector (Debezium) or a sink connector. We generally recommend to apply the transformation on the sink side as it means that the messages stored in Apache Kafka will contain the whole context. The final decision depends on use case for each user.

Configuration

The configuration is a part of source/sink task connector and is expressed in a set of properties:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.source.fields=table,lsn

Record filtering for delete records

The SMT provides a special handling for events that signal a delete operation. When a DELETE is executed on a datasource then Debezium generates two events:

  • a record with d operation that contains only old row data

  • (optionally) a record with null value and the same key (a "tombstone" message). This record serves as a marker for Apache Kafka that all messages with this key can be removed from the topic during log compaction.

Upon processing these two records, the SMT can pass on the d record as is, convert it into another tombstone record or drop it. The original tombstone message can be passed on as is or also be dropped.

The SMT by default filters out both delete records as widely used sink connectors do not support handling of tombstone messages at this point.

Adding source metadata fields

The SMT can optionally add metadata fields from the original change event’s source structure to the final flattened record (prefixed with "__"). This functionality can be used to add things like the table from the change event, or connector specified fields like the Postgres LSN field. For more information on what’s available in the source structure see the documentation for each connector

For example, the configuration

transforms.unwrap.add.source.fields=table,lsn

will add

{ "__table": "MY_TABLE", "__lsn": "123456789", ...}

to the final flattened record.

For DELETE events, this option is only supported when the delete.handling.mode option is set to "rewrite".

Configuration options

Property Default Description

drop.tombstones

true

The SMT removes the tombstone generated by Debezium from the stream.

delete.handling.mode

drop

The SMT can drop (the default), rewrite or pass delete events (none). The rewrite mode will add a __deleted column with true/false values based on record operation.

add.source.fields

Fields from the change event’s source structure to add as metadata (prefixed with "__") to the flattened record