New Record State Extraction
| This single message transformation (SMT) is supported for only the SQL database connectors. For the MongoDB connector, see the documentation for the MongoDB equivalent to this SMT. | 
A Debezium data change event has a complex structure that provides a wealth of information. Kafka records that convey Debezium change events contain all of this information. However, parts of a Kafka ecosystem might expect Kafka records that provide a flat structure of field names and values. To provide this kind of record, Debezium provides the event flattening single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain Debezium change events.
The event flattening transformation is a Kafka Connect SMT.
Change event structure
Debezium generates data change events that have a complex structure. Each event consists of three parts:
- 
Metadata, which includes but is not limited to: - 
The operation that made the change 
- 
Source information such as the names of the database and table where the change was made 
- 
Time stamp for when the change was made 
- 
Optional transaction information 
 
- 
- 
Row data before the change 
- 
Row data after the change 
For example, part of the structure of an UPDATE change event looks like this:
{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}More details about change event structure are provided in the documentation for each connector.
This complex format provides the most information about changes happening in the system. However, other connectors or other parts of the Kafka ecosystem usually expect the data in a simple format like this:
{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}To provide the needed Kafka record format for consumers, configure the event flattening SMT.
Behavior
The event flattening SMT extracts the after field from a Debezium change event in a Kafka record. The SMT replaces the original change event with only its after field to create a simple Kafka record.
You can configure the event flattening SMT for a Debezium connector or for a sink connector that consumes messages emitted by a Debezium connector. The advantage of configuring event flattening for a sink connector is that records stored in Apache Kafka contain whole Debezium change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.
You can configure the transformation to do any of the following:
- 
Add metadata from the change event to the simplified Kafka record. The default behavior is that the SMT does not add metadata. 
- 
Keep Kafka records that contain change events for DELETEoperations in the stream. The default behavior is that the SMT drops Kafka records forDELETEoperation change events because most consumers cannot yet handle them.
A database DELETE operation causes Debezium to generate two Kafka records:
- 
A record that contains "op": "d",thebeforerow data, and some other fields.
- 
A tombstone record that has the same key as the deleted row and a value of null. This record is a marker for Apache Kafka. It indicates that log compaction can remove all records that have this key.
Instead of dropping the record that contains the before row data, you can configure the event flattening SMT to do one of the following:
- 
Keep the record in the stream and edit it to have only the "value": "null"field.
- 
Keep the record in the stream and edit it to have a valuefield that contains the key/value pairs that were in thebeforefield with an added"__deleted": "true"entry.
Similary, instead of dropping the tombstone record, you can configure the event flattening SMT to keep the tombstone record in the stream.
Configuration
Configure the Debezium event flattening SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector’s configuration. To obtain the default behavior, in a .properties file, you would specify something like the following:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordStateAs with any Kafka Connect connector configuration, you can set transforms= to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.
The following .properties example sets several event flattening SMT options:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=table,lsn- drop.tombstones=false
- 
Keeps tombstone records for DELETEoperations in the event stream.
- delete.handling.mode=rewrite
- 
For DELETEoperations, edits the Kafka record by flattening thevaluefield that was in the change event. Thevaluefield directly contains the key/value pairs that were in thebeforefield. The SMT adds__deletedand sets it totrue, for example:"value": { "pk": 2, "cola": null, "__deleted": "true" }
- add.fields=table,lsn
- 
Adds change event metadata for the tableandlsnfields to the simplified Kafka record.
The connector might emit many types of event messages (heartbeat messages, tombstone messages, or metadata messages about transactions or schema changes). To apply the transformation to a subset of events, you can define an SMT predicate statement that selectively applies the transformation to specific events only.
Adding metadata
The event flattening SMT can add original, change event metadata to the simplified Kafka record. For example, you might want the simplified record’s header or value to contain any of the following:
- 
The type of operation that made the change 
- 
The name of the database or table that was changed 
- 
Connector-specific fields such as the Postgres LSN field 
For more information on what is available see the documentation for each connector.
To add metadata to the simplified Kafka record’s header, specify the add.headers option.
To add metadata to the simplified Kafka record’s value, specify the add.fields option.
Each of these options takes a comma separated list of change event field names. Do not specify spaces. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field. For example:
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.add.fields=op,table,lsn,source.ts_ms transforms.unwrap.add.headers=db transforms.unwrap.delete.handling.mode=rewrite
With that configuration, a simplified Kafka record would contain something like the following:
{
 ...
	"__op" : "c",
	"__table": "MY_TABLE",
	"__lsn": "123456789",
	"__source_ts_ms" : "123456789",
 ...
}Also, simplified Kafka records would have a __db header.
In the simplified Kafka record, the SMT prefixes the metadata field names with a double underscore. When you specify a struct, the SMT also inserts an underscore between the struct name and the field name.
To add metadata to a simplified Kafka record that is for a DELETE operation, you must also configure delete.handling.mode=rewrite.
Options for applying the event-flattening transformation selectively
In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.
For more information about how to apply the SMT selectively, see Configure an SMT predicate for the transformation.
Configuration options
The following table describes the options that you can specify to configure the event flattening SMT.
| Option | Default | Description | 
|---|---|---|
| 
 | Debezium generates a tombstone record for each  | |
| 
 | Debezium generates a change event record for each  | |
| To use row data to determine the topic to route the record to, set this option to an  | ||
| __ (double-underscore) | Set this optional string to prefix a field. | |
| Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record’s value. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example  | ||
| __ (double-underscore) | Set this optional string to prefix a header. | |
| Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example  |