Exporting CloudEvents

Support for CloudEvents currently is in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive. Please let us know or your specific requirements or if you encounter any problems while using this feature.

CloudEvents is a "specification for describing event data in a common way", aiming at providing "interoperability across services, platforms and systems". It defines:

  • a set of standardized event attributes

  • rules for defining custom attributes

  • encoding rules for mapping event formats to serialized representations such as JSON or Avro

  • protocol bindings for transport layers such as Apache Kafka, HTTP or AMQP

By means of a specific Kafka Connect message converter (io.debezium.converters.CloudEventsConverter), Debezium optionally emits change events that adhere to the CloudEvents specification. Currently, the structured mode of mapping CloudEvents is supported, using JSON and/or Avro as the envelope and the data format. Support for the binary mode of mapping CloudEvents will be added in a future version of Debezium.

Event Format

The following example shows a CloudEvents event of the Debezium Postgres connector, using JSON as envelope and data format:

{
  "id" : "name:test_server;lsn:29274832;txId:565",   (1)
  "source" : "/debezium/postgresql/test_server",     (2)
  "specversion" : "1.0",                             (3)
  "type" : "io.debezium.postgresql.datachangeevent", (4)
  "time" : "2020-01-13T13:55:39.738Z",               (5)
  "datacontenttype" : "application/json",            (6)
  "iodebeziumop" : "r",                              (7)
  "iodebeziumversion" : "1.1.0-SNAPSHOT",            (8)
  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578923739738",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumtxId" : "565",
  "iodebeziumlsn" : "29274832",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "565",                           (9)
  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : {                                         (10)
    "before" : null,
    "after" : {
      "pk" : 1,
      "name" : "Bob"
    }
  }
}
1 Unique id of the change event, based on the connector’s logical name ("test_server") and connector-specific attributes identifying the event
2 Source of the event, i.e. the logical name of the database
3 The CloudEvents specification versions
4 The type of change event
5 Time of the change in the source database
6 Describes the content type of the data attribute; JSON in this case
7 Operation type identifier (one of (r)ead, (c)reate, (u)pdate or (d)elete)
8 All the source attributes known from Debezium data change events are mapped to CloudEvents extension attributes, using the "iodebezium" prefix for the attribute name
9 All the transaction attributes known from Debezium data change events (when enabled in connector) are mapped to CloudEvents extension attributes, using the "iodebeziumtx" prefix for the attribute name
10 The actual data change itself; depending on the kind of change and the type of connector, may contain the before, after and/or patch fields

Alternatively, Avro can be used as content type for the data attribute:

{
  "id" : "name:test_server;lsn:33227720;txId:578",
  "source" : "/debezium/postgresql/test_server",
  "specversion" : "1.0",
  "type" : "io.debezium.postgresql.datachangeevent",
  "time" : "2020-01-13T14:04:18.597Z",
  "datacontenttype" : "application/avro",            (1)
  "dataschema" : "http://my-registry/schemas/ids/1", (2)
  "iodebeziumop" : "r",
  "iodebeziumversion" : "1.1.0-SNAPSHOT",
  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578924258597",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumtxId" : "578",
  "iodebeziumlsn" : "33227720",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "578",
  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : "AAAAAAEAAgICAg=="                        (3)
}
1 Designates the data attribute to be of type Avro binary data
2 URI of the schema to which the Avro data adheres
3 The data attribute, base64-encoded Avro binary data

Finally, it’s also possible to use Avro for the entire envelope as well as the data attribute.

Configuration Options

The following configuration options exist when usi

Property Default Description

serializer.type

json

The encoding type to use for the CloudEvents envelope structure; one of json or avro.

data.serializer.type

json

The encoding type to use for the data attribute; can be json if serializer.type is avro; can be json or avro, if serializer.type is json or avro.

json. ...

N/A

Any configuration options to be passed through to the underlying converter when using JSON (the "json." prefix will be removed)

avro. ...

N/A

Any configuration options to be passed through to the underlying converter when using Avro (the "avro." prefix will be removed)

The following shows an example configuration for using JSON as envelope format (the default, so value.converter.serializer.type could also be omitted) and Avro as data content type:

...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json",
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://schema-registry:8081"
...

Note this converter is solely meant to be used as a converter for Kafka record values; it can be used together with any other converter for serializing record keys, e.g. the String, Long, JSON or Avro converters.