Outbox Event Router

The outbox pattern is a way to safely and reliably exchange data between multiple (micro) services. An outbox pattern implementation avoids inconsistencies between a service’s internal state (as typically persisted in its database) and state in events consumed by services that need the same data.

To implement the outbox pattern in a Debezium application, configure a Debezium connector to:

  • Capture changes in an outbox table

  • Apply the Debezium outbox event router single message transformation (SMT)

A Debezium connector that is configured to apply the outbox SMT should capture changes that occur in an outbox table only. For more information, see Options for applying the transformation selectively.

A connector can capture changes in more than one outbox table only if each outbox table has the same structure.

See Reliable Microservices Data Exchange With the Outbox Pattern to learn about why the outbox pattern is useful and how it works.

For an example that you can run, see the outbox pattern demo, which is in the Debezium examples repository. It includes an example of how to configure a Debezium connector to run the outbox event router SMT.

The outbox event router SMT is not compatible with the MongoDB connector.

MongoDB users can run the MongoDB outbox event router SMT.

Example outbox message

To understand how the Debezium outbox event router SMT is configured, review the following example of a Debezium outbox message:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

A Debezium connector that is configured to apply the outbox event router SMT generates the above message by transforming a Debezium raw message like this:

# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
  "before": null,
  "after": {
    "id": "406c07f3-26f0-4eea-a50c-109940064b8f",
    "aggregateid": "1",
    "aggregatetype": "Order",
    "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
    "timestamp": 1556890294344,
    "type": "OrderCreated"
  },
  "source": {
    "version": "2.7.2.Final",
    "connector": "postgresql",
    "name": "dbserver1-bare",
    "db": "orderdb",
    "ts_usec": 1556890294448870,
    "txId": 584,
    "lsn": 24064704,
    "schema": "inventory",
    "table": "outboxevent",
    "snapshot": false,
    "last_snapshot_record": null,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1556890294484,
  "ts_us": 1556890294484651,
  "ts_ns": 1556890294484651402
}

This example of a Debezium outbox message is based on the default outbox event router configuration, which assumes an outbox table structure and event routing based on aggregates. To customize behavior, the outbox event router SMT provides numerous configuration options.

Basic outbox table

To apply the default outbox event router SMT configuration, your outbox table is assumed to have the following columns:

Column        |          Type          | Modifiers
--------------+------------------------+-----------
id            | uuid                   | not null
aggregatetype | character varying(255) | not null
aggregateid   | character varying(255) | not null
type          | character varying(255) | not null
payload       | jsonb                  |
Table 1. Descriptions of expected outbox table columns
Column Effect

id

Contains the unique ID of the event. In an outbox message, this value is a header. You can use this ID, for example, to remove duplicate messages.

To obtain the unique ID of the event from a different outbox table column, set the table.field.event.id SMT option in the connector configuration.

aggregatetype

Contains a value that the SMT appends to the name of the topic to which the connector emits an outbox message. The default behavior is that this value replaces the default ${routedByValue} variable in the route.topic.replacement SMT option.

For example, in a default configuration, the route.by.field SMT option is set to aggregatetype and the route.topic.replacement SMT option is set to outbox.event.${routedByValue}. Suppose that your application adds two records to the outbox table. In the first record, the value in the aggregatetype column is customers. In the second record, the value in the aggregatetype column is orders. The connector emits the first record to the outbox.event.customers topic. The connector emits the second record to the outbox.event.orders topic.

To obtain this value from a different outbox table column, set the route.by.field SMT option in the connector configuration.

aggregateid

Contains the event key, which provides an ID for the payload. The SMT uses this value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.

To obtain the event key from a different outbox table column, set the table.field.event.key SMT option in the connector configuration.

payload

A representation of the outbox change event. The default structure is JSON. By default, the Kafka message value is solely comprised of the payload value. However, if the outbox event is configured to include additional fields, the Kafka message value contains an envelope encapsulating both payload and the additional fields, and each field is represented separately. For more information, see Emitting messages with additional fields.

To obtain the event payload from a different outbox table column, set the table.field.event.payload SMT option in the connector configuration.

Additional custom columns

Any additional columns from the outbox table can be added to outbox events either within the payload section or as a message header.

One example could be a column eventType which conveys a user-defined value that helps to categorize or organize events.

Basic configuration

To configure a Debezium connector to support the outbox pattern, configure the outbox.EventRouter SMT. To obtain the default behavior of the SMT, add it to the connector configuration without specifying any options, as in the following example:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
Customizing the configuration

The connector might emit many types of event messages (for example, heartbeat messages, tombstone messages, or metadata messages about transactions or schema changes). To apply the transformation only to events that originate in the outbox table, define an SMT predicate statement that selectively applies the transformation to those events only.

Options for applying the transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages. You can use one of the following methods to configure the connector to apply the SMT selectively:

Payload serialization format

The outbox event router SMT supports arbitrary payload formats. The SMT passes on payload column values that it reads from the outbox table without modification. The way that the SMT converts these column values into Kafka message fields depends on how you configure the SMT. Common payload formats for serializing data are JSON and Avro.

Using JSON as the payload format

The default serialization format for the outbox event router SMT is JSON. To use this format, the data type of the source column must be JSON (for example, jsonb in PostgreSQL).

Expanding escaped JSON String as JSON

When a Debezium outbox message represents the payload as a JSON String, the resulting Kafka message escapes the string as in the following example:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

The outbox event router enables you to expand the message content to "real" JSON, deducing the companion schema from the JSON document. The resulting Kafka message is formatted as in the following example:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}

To enable use of the outbox event router transformation, set the table.expand.json.payload to true, and use the JsonConverter as shown in the following example:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.expand.json.payload=true
value.converter=org.apache.kafka.connect.json.JsonConverter

Using Apache Avro as the payload format

Apache Avro is a common framework for serializing data. Using Avro can be beneficial for message format governance and for ensuring that outbox event schemas evolve in a backwards-compatible way.

How a source application produces Avro formatted content for outbox message payloads is out of the scope of this documentation. One possibility is to leverage the KafkaAvroSerializer class to serialize GenericRecord instances. To ensure that the Kafka message value is the exact Avro binary data, apply the following configuration to the connector:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter

By default, the payload column value (the Avro data) is the only message value. When data is stored in Avro format, the column format must be set to a binary data type, such as bytea in PostgreSQL. The value converter for the SMT must also be set to BinaryDataConverter, so that it propagates the binary value of the payload column as-is into the Kafka message value.

The Debezium connectors may be configured to emit heartbeat, transaction metadata, or schema change events (support varies by connector). These events cannot be serialized by the BinaryDataConverter so additional configuration must be provided so the converter knows how to serialize these events. As an example, the following configuration illustrates using the Apache Kafka JsonConverter with no schemas:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false

The delegate Converter implementation is specified by the delegate.converter.type option. If any extra configuration options are needed by the converter, they can also be specified, such as the disablement of schemas shown above using schemas.enable=false.

The following example illustrates how to configure the SMT to use a delegate converter with a Apicurio Registry to convert data into Avro format:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.apicurio.registry.utils.converter.AvroConverter
value.converter.delegate.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
value.converter.delegate.converter.apicurio.registry.auto-register=true
value.converter.delegate.converter.registry.find-latest=true

Finally, the following example illustrates how to configure the SMT to use a delegate converter with a Confluent Schema Registry to convert data into Avro format:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.confluent.connect.avro.AvroConverter
value.converter.delegate.converter.type.basic.auth.credentials.source=USER_INFO
value.converter.delegate.converter.type.basic.auth.user.info={CREDENTIALS}
value.converter.delegate.converter.type.schema.registry.url={URL}

In the preceding configuration examples, because the AvroConverter is configured as a delegate converter, third-party libraries are required. Information about how to add third-party libraries to the classpath is beyond the scope of this document.

Emitting messages with additional fields

Your outbox table might contain columns whose values you want to add to the emitted outbox messages. For example, consider an outbox table that has a value of purchase-order in the aggregatetype column and another column, eventType, whose possible values are order-created and order-shipped. Additional fields can be added with the syntax column:placement:alias.

The allowed values for placement are: - header - envelope - partition

To emit the eventType column value in the outbox message header, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:header:type

The result will be a header on the Kafka message with type as its key, and the value of the eventType column as its value.

To emit the eventType column value in the outbox message envelope, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:envelope:type

To control which partition the outbox message is produced on, configure the SMT like this:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=partitionColumn:partition

Note that for the partition placement, adding an alias will have no effect.

Configuration options

The following table describes the options that you can specify for the outbox event router SMT. In the table, the Group column indicates a configuration option classification for Kafka.

Table 2. Descriptions of outbox event router SMT configuration options
Option Default Group Description

warn

Table

Determines the behavior of the SMT when there is an UPDATE operation on the outbox table. Possible settings are:

  • warn - The SMT logs a warning and continues to the next outbox table record.

  • error - The SMT logs an error and continues to the next outbox table record.

  • fatal - The SMT logs an error and the connector stops processing.

All changes in an outbox table are expected to be INSERT operations. That is, an outbox table functions as a queue; updates to records in an outbox table are not allowed. The SMT automatically filters out DELETE operations on an outbox table.

id

Table

Specifies the outbox table column that contains the unique event ID. This ID will be stored in the emitted event’s headers under the id key.

aggregateid

Table

Specifies the outbox table column that contains the event key. When this column contains a value, the SMT uses that value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.

Table

By default, the timestamp in the emitted outbox message is the Debezium event timestamp. To use a different timestamp in outbox messages, set this option to an outbox table column that contains the timestamp that you want to be in emitted outbox messages.

payload

Table

Specifies the outbox table column that contains the event payload.

false

Table

Specifies whether the JSON expansion of a String payload should be done. If no content found or in case of parsing error, the content is kept "as is".

Fore more details, please see the expanding escaped json section.

ignore

Table

When enable JSON expansion property table.expand.json.payload, determines the behavior of json payload that including an null value on the outbox table. Possible settings are:

  • ignore - Ignore the null value.

  • optional_bytes - Keep the null value, and treat null as optional bytes of connect.

Table, Envelope

Specifies one or more outbox table columns that you want to add to outbox message headers or envelopes. Specify a comma-separated list of pairs. In each pair, specify the name of a column and whether you want the value to be in the header or the envelope. Separate the values in the pair with a colon, for example:

id:header,my-field:envelope

To specify an alias for the column, specify a trio with the alias as the third value, for example:

id:header,my-field:envelope:my-alias

The second value is the placement and it must always be header or envelope.

true

Table, Envelope

Specifies whether this transformation throws an error if a field specified by the table.fields.additional.placement property is not found in the Outbox payload.

Table, Schema

When set, this value is used as the schema version as described in the Kafka Connect Schema Javadoc.

aggregatetype

Router

Specifies the name of a column in the outbox table. The default behavior is that the value in this column becomes a part of the name of the topic to which the connector emits the outbox messages. An example is in the description of the expected outbox table.

(?<routedByValue>.*)

Router

Specifies a regular expression that the outbox SMT applies in the RegexRouter to outbox table records. This regular expression is part of the setting of the route.topic.replacement SMT option.

The default behavior is that the SMT replaces the default ${routedByValue} variable in the setting of the route.topic.replacement SMT option with the setting of the route.by.field outbox SMT option.

outbox.event​.${routedByValue}

Router

Specifies the name of the topic to which the connector emits outbox messages. The default topic name is outbox.event. followed by the aggregatetype column value in the outbox table record. For example, if the aggregatetype value is customers, the topic name is outbox.event.customers.

To change the topic name, you can:

false

Router

Indicates whether an empty or null payload causes the connector to emit a tombstone event.

tracingspancontext

Tracing

The name of the field containing tracing span context.

debezium-read

Tracing

The operation name representing the Debezium processing span.

false

Tracing

When true only events that have serialized context field should be traced.

Distributed tracing

The outbox event routing SMT has support for distributed tracing. See tracing documentation for more details.