MongoDB New Document State Extraction
The Debezium MongoDB connector emits data change messages to represent each operation that occurs in a MongoDB collection. The complex structure of these event messages faithfully represent the details of the original database event. However, some downstream consumers might not be able to process the messages in their original format. For example, to represent nested documents in a data collection, the connector emits an event message in a format that includes nested fields. To support sink connectors, or other consumers that cannot process the hierarchical format of the original messages, you can use the Debezium MongoDB event flattening (ExtractNewDocumentState) single message transformation (SMT). The SMT simplifies the structure of the original messages, and can modify messages in other ways to make data easier to process.
The event flattening transformation is a Kafka Connect SMT.
The information in this chapter describes the event flattening single message transformation (SMT) for Debezium MongoDB connectors only. For information about an equivalent SMT for use with relational databases, see the documentation for the New Record State Extraction SMT. |
Change event structure
The Debezium MongoDB connector generates change events that have a complex structure. Each event message includes the following parts:
- Source metadata
-
Includes, but is not limited to the following fields:
-
Type of the operation that changed data in the collection (create/insert, update, or delete).
-
Name of the database and collection in which the change occurred.
-
Timestamp that identifies when the change was made.
-
Optional transaction information.
-
- Document data
-
before
data-
This field is present in environments that run MongoDB 6.0 and later when the
capture.mode
for the Debezium connector is set to one of the following values:-
change_streams_with_pre_image
. -
change_streams_update_full_with_pre_image
.For more information, see MongoDB pre-image support
-
after
data-
JSON strings that represent the values that are present in a document after the current operation. The presence of an
after
field in an event message depends on the type of event and the connector configuration. Acreate
event for a MongoDBinsert
operation always contain anafter
field, regardless of thecapture.mode
setting. Forupdate
events, theafter
field is present only whencapture.mode
is set to one of the following values:-
change_streams_update_full
-
change_streams_update_full_with_pre_image
.The
after
value in a change event message does not necessarily represent the state of a document immediately following the event. The value is not calculated dynamically; instead, after the connector captures a change event, it queries the collection to retrieve the current value of the document.For example, imagine a situation in which multiple operations,
a
,b
, andc
modify a document in quick succession. When the connector processes, changea
, it queries the collection for the full document. In the meantime, changesb
andc
occur. When the connector receives a response to its query for the full document for changea
, it might receive a version of the document that is based on the subsequent changes forb
orc
. For more information, see the documentation for thecapture.mode
property.
-
The following fragment shows the basic structure of a create
change event that the connector emits after a MongoDB insert
operation:
{
"op": "c",
"after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}",
"source": { ... }
}
The complex format of the after
field in the preceding example provides detailed information about changes that occur in the source database.
However, some consumers cannot process messages that contain nested values.
To convert the complex nested fields of the original message into a simpler, more universally compatible structure, use the event flattening SMT for MongoDB.
The SMT flattens the structure of nested fields in a message, as shown in the following example:
{
"field1" : "newvalue1",
"field2" : "newvalue2"
}
For more information about the default structure of messages produced by the Debezium MongoDB connector, see the connector documentation.
Behavior
The event flattening SMT for MongoDB extracts the after
field from create
or update
change event messages emitted by the Debezium MongoDB connector.
After the SMT processes the original change event message, it generates a simplified version that contains only the contents of the after
field.
Depending on your use case, you can apply the ExtractNewDocumentState SMT to the Debezium MongoDB connector, or to a sink connector that consumes messages that the Debezium connector produces. If you apply the SMT to the Debezium MongoDB connector, the SMT modifies messages that the connector emits before they are sent to Apache Kafka. To ensure that Kafka retains the complete Debezium change event message in its original format, apply the SMT to a sink connector.
When you use the event flattening SMT to process a message emitted from a MongoDB connector, the SMT converts the structure of the records in the original message into properly typed Kafka Connect records that can be consumed by a typical sink connector.
For example, the SMT converts the JSON strings that represent the after
information in the original message into schema structures that any consumer can process.
Optionally, you can configure the event flattening SMT for MongoDB to modify messages in other ways during processing. For more information, see the configuration topic.
Configuration
Configure the event flattening (ExtractNewDocumentState) SMT for MongoDB for sink connectors that consume the messages emitted by the Debezium MongoDB connector.
Basic configuration
To obtain the default behavior of the SMT, add the SMT to the configuration of a sink connector without specifying any options, as in the following example:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
As with any Kafka Connect connector configuration, you can set transforms=
to multiple, comma-separated, SMT aliases.
Kafka Connect applies the transformations that you specify in the order in which they are listed.
You can set multiple options for a connector that uses the MongoDB event flattening SMT.
The following example shows a configuration that sets the drop.tombstones
, delete.handling.mode
, and add.headers
options for a connector:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=drop
transforms.unwrap.add.headers=op
For more information about the configuration options in the preceding example, see the configuration topic.
The connector might emit many types of event messages (for example, heartbeat messages, tombstone messages, or metadata messages about transactions). To apply the transformation to a subset of events, you can define an SMT predicate statement that selectively applies the transformation to specific events only.
Array encoding
By default, the event flattening SMT converts MongoDB arrays into arrays that are compatible with Apache Kafka Connect, or Apache Avro schemas. While MongoDB arrays can contain multiple types of elements, all elements in a Kafka array must be of the same type.
To ensure that the SMT encodes arrays in a way that meets the needs of your environment, you can specify the array.encoding
configuration option.
The following example shows the configuration for setting the array encoding:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.array.encoding=<array|document>
Depending on the configuration, the SMT processes each instance of an array in the source message by using one of the following encoding methods:
- array encoding
-
If
array.encoding
is set toarray
(the default), the SMT encodes uses thearray
datatype to encode arrays in the original message. To ensure correct processing, all elements in an array instance must be of the same type. This option is a restricting one, but it enables downstream clients to easily process arrays. - document encoding
-
If
array.encoding
is set todocument
, the SMT converts each array in the source into a struct of structs, in a manner that is similar to BSON serialization. The main struct contains fields named_0
,_1
,_2
, and so on, where each field name represents the index of an element in the original array. The SMT populates each of these index fields with the values that it retrieves for the equivalent element in the source array. Index names are prefixed with underscores, because Avro encoding prohibits field names that begin with a numeric character.
The following example shows how the Debezium MongoDB connector represents a database document that contains an array that includes heterogeneous data types:
{
"_id": 1,
"a1": [
{
"a": 1,
"b": "none"
},
{
"a": "c",
"d": "something"
}
]
}
If the array.encoding
is set to document
, the SMT converts the preceding document into the following format:
{
"_id": 1,
"a1": {
"_0": {
"a": 1,
"b": "none"
},
"_1": {
"a": "c",
"d": "something"
}
}
}
The document
encoding option enables the SMT to process arbitrary arrays that are comprised of heterogeneous elements.
However, before you use this option, always verify that the sink connector and other downstream consumers are capable of processing arrays that contain multiple data types.
Nested structure flattening
When a database operation involves an embedded document, the Debezium MongoDB connector emits a Kafka event record that has a structure that reflects the hierarchical structure of the original document. That is, the event message represents nested documents as a set of nested field structure. In environments where downstream connectors cannot process messages that contain nested structures, you can configure the event flattening SMT to flatten hierarchical structures in the message. A flat message structure is better suited to table-like storage.
To configure the SMT to flatten nested structures, set the flatten.struct
configuration option to true
.
In the converted message, field names are constructed to be consistent with the document source.
The SMT renames each flattened field by concatenating the name of the parent document field with the name of the nested document field.
A delimiter that is defined by the flatten.struct.delimiter
option separates the components of the name.
The default value of struct.delimiter
is an underscore character (_
).
The following example shows the configuration for specifying whether the SMT flattens nested structures:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.flatten.struct=<true|false>
transforms.unwrap.flatten.struct.delimiter=<string>
The following example shows an event message that is emitted by the MongoDB connector.
The message includes a field for a document a
that contains fields for two nested documents, b
and c
:
{
"_id": 1,
"a": {
"b": 1,
"c": "none"
},
"d": 100
}
The message in the following example shows the output after the SMT for MongoDB flattens the nested structures in the preceding message:
{
"_id": 1,
"a_b": 1,
"a_c": "none",
"d": 100
}
In the resulting message, the b
and c
fields that were nested in the original message are flattened and renamed.
The renamed fields are formed by concatenating the name of the parent document a
with the names of the nested documents: a_b
and a_c
.
The components of the new field names are separated by an underscore character, as defined by the setting of the struct.delimiter
configuration property,
MongoDB $unset
handling
In MongoDB, the $unset
operator and the $rename
operator both remove fields from a document.
Because MongoDB collections are schemaless, after an update removes fields from a document, it’s not possible to infer the name of the missing field from the updated document.
To support sink connectors or other consumers that might require information about removed fields, Debezium emits update messages that include a removedFields
element that lists the names of the deleted fields.
The following example shows part of an update message for an operation that results in the removal of the field a
:
"payload": {
"op": "u",
"ts_ms": "...",
"ts_us" : "...",
"ts_ns" : "...",
"before": "{ ... }",
"after": "{ ... }",
"updateDescription": {
"removedFields": ["a"],
"updatedFields": null,
"truncatedArrays": null
}
}
In the preceding example, the before
and after
represent the state of the source document before and after the document was updated.
These fields are present in the event message that a connector emits only if the capture.mode
for the connector is set as described in the following list:
before
field-
Provides the state of the document before the change. This field is present only when
capture.mode
is set to one of the following values:-
change_streams_with_pre_image
-
change_streams_update_full_with_pre_image
.
-
after
field-
Provides the full state of the document after a change. This field is present only when
capture.mode
is set to one of the following values:-
change_streams_update_full
-
change_streams_update_full_with_pre_image
.
-
Assuming a connector that is configured to capture full documents, when the ExtractNewDocumentState
SMT receives an update
message for an $unset
event, the SMT re-encodes the message by representing the removed field has a null
value, as shown in the following example:
{
"id": 1,
"a": null
}
For connectors that are not configured to capture full documents, when the SMT receives an update event for an $unset
operation, it produces the following output message:
{
"a": null
}
Determine original operation
After the SMT flattens an event message, the resulting message no longer indicates whether the operation that generated the event was of type create
, update
or initial snapshot read
.
Typically, you can identify delete
operations by configuring the connectors to expose information about the tombstone or rewrite events that accompany a deletion.
For more information about configuring the connector to expose information about tombstones and rewrites in event messages, see the drop.tombstones
and delete.handling.mode
properties.
To report the type of a database operation in an event message, the SMT can add an op
field to one of the following elements:
-
The event message body.
-
A message header.
For example, to add a header property that shows the type of the original operation, add the transform, and then add the add.headers
property to the connector configuration, as in the following example:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.headers=op
Based on the preceding configuration, the SMT reports the event type by adding an op
header to the message and assigning it a string value to identify the type of the operation.
The assigned string value is based on the op
field value in the original MongoDB change event message.
Adding metadata fields
The event flattening SMT for MongoDB can add metadata fields from the original change event message to the simplified message.
The added metadata fields are prefixed with a double underscore ("__"
).
Adding metadata to the event record makes it possible to include content such as the name of the collection in which a change event occurred, or to include connector-specific fields, such as a replica set name.
Currently, the SMT can add fields from the following change event sub-structures only: source
, transaction
and updateDescription
.
For more information about the MongoDB change event structure, see the MongoDB connector documentation.
For example, you might specify the following configuration to add the replica set name (rs
) and the collection name for a change event to the final flattened event record:
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.add.fields=rs,collection
The preceding configuration results in the following content being added to the flattened record:
{ "__rs" : "rs0", "__collection" : "my-collection", ... }
If you want the SMT to add metadata fields to delete
events, set the value of the delete.handling.mode
option to rewrite
.
Options for applying the transformation selectively
In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.
For more information about how to apply the SMT selectively, see Configure an SMT predicate for the transformation.
Configuration options
The following table describes the configuration options for the MongoDB event flattening SMT.
Property | Default | Description | ||
---|---|---|---|---|
|
Specifies the format that the SMT uses when it encodes arrays that it reads from the original event message. Set one of the following options:
For more information about the |
|||
|
The SMT flattens structures (structs) in the original event message by concatenating the names of nested properties in the message, separated by a configurable delimiter, to form a simple field name. |
|||
|
When |
|||
|
Debezium generates a tombstone record for each
|
|||
|
Specifies how the SMT handles the change event records that Debezium generates for
|
|||
No default |
Debezium generates a change event record for each
Set one of the following options:
|
|||
|
When set to |
|||
__ (double-underscore) |
Set this optional string to prefix a header. |
|||
No default |
Specifies a comma-separated list, with no spaces, of metadata fields that you want the SMT to add to the header of simplified messages.
When the original message contains duplicate field names, you can identify the specific field to modify by providing the name of the struct together with the name of the field, for example, Optionally, you can override the original name of a field and assign it a new name by adding an entry in the following format to the list:
For example:
The new name values that you specify are case-sensitive. |
|||
__ (double-underscore) |
Specifies an optional string to prefix to a field name. |
|||
No default |
Set this option to a comma-separated list, with no spaces, of metadata fields to add to the
For example:
The new name values that you specify are case-sensitive. When the SMT adds metadata fields to the |
Known limitations
-
Because MongoDB is a schemaless database, to ensure consistent column definitions when you use Debezium to stream changes to a schema-based data relational database, fields within a collection that have the same name must store the same type of data.
-
Configure the SMT to produce messages in the format that is compatible with the sink connector. If a sink connector requires a "flat" message structure, but it receives a message that encodes an array in the source MongoDB document as a struct of structs, the sink connector cannot process the message.