MongoDB CDC Event Flattening

This single message transformation (SMT) is available since Debezium version 0.7.2. It is under active development right now, so the emitted message structure or other details may still change as development progresses. Please see below for a descriptions of known limitations of this transformation.

This SMT is supported only for the MongoDB connector.

The Debezium MongoDB connector generates the data in a form of a complex message structure. The message consists of two parts:

  • operation and metadata

  • for inserts, the row data after the insert has been executed; for updates a patch element describing the altered fields

The after and patch elements are Strings containing JSON representations of the inserted/altered data. E.g. the general message structure for a insert event looks like this:

{
  "op": "r",
  "after": "{\"field1\":newvalue1,\"field2\":\"newvalue1\"}",
  "source": { ... }
}

More details about the message structure are provided in the documentation of the MongoDB connector.

While this structure is a good fit to represent changes to MongoDB’s schemaless collections, it’s not understood by existing sink connectors as for instance the Confluent JDBC sink connector.

Therefore Debezium provides a a single message transformation (SMT) which converts the after/patch information from the MongoDB CDC events into a structure suitable for consumption by existing sink connectors. To do so, the SMT parses the JSON strings and reconstructs properly typed Kafka Connect (comprising the correct message payload and schema) records from that, which then can be consumed by connectors such as the JDBC sink connector.

Using JSON as visualization of the emitted record structure, the event from above would like this:

{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}

The SMT should be applied on a sink connector.

Configuration

The configuration is a part of sink task connector and is expressed in a set of properties:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope

Known limitations

  • Feeding data changes from a schemaless store such as MongoDB to strictly schema-based datastores such as a relational database can by definition work within certain limits only. Specifically, all fields of documents within one collection with the same name must be of the same type. Otherwise, no consistent column definition can be derived in the target database.

  • Arrays will be restored in the emitted Kafka Connect record correctly, but they are not supported my sink connector just expecting a "flat" message structure.

  • Currently it’s not possible to drop deletes and tomb stone events. This requirement is tracked as DBZ-563.

  • It’s planned to optionally flatten nested structures (which are no arrays). That way a complex field such as _id : { comp: 32, cust: NumberLong("1008") } would be represented as id_comp : 32, id_cust: 1008 in the emitted message. This requirement is tracked as DBZ-561.

back to top