Outbox as in that folder in my email client? No, not exactly but there are some similarities!

The term outbox describes a pattern that allows independent components or services to perform read your own write semantics while concurrently providing a reliable, eventually consistent view to those writes across component or service boundaries.

You can read more about the Outbox pattern and how it applies to microservices in our blog post, Reliable Microservices Data Exchange With the Outbox Patttern.

So what exactly is an Outbox Event Router?

In Debezium version 0.9.3.Final, we introduced a ready-to-use Single Message Transform (SMT) that builds on the Outbox pattern to propagate data change events using Debezium and Kafka. Please see the documentation for details on how to use this transformation.

Going Supersonic with Quarkus!

Quarkus is a Kubernetes Native Java framework that is tailored for GraalVM and HotSpot using the best-of-breed Java technologies and standards. Quarkus aims to offer developers a unified reactive and imperative programming model to address a wide range of application architectures.

So what does all this mean exactly in laymen’s terms?

In short, the Debezium community can now leverage the Outbox pattern in a Quarkus-based application using a ready-to-use extension that works in parallel with your Debezium connector to emit change data events. The Debezium Outbox extension for Quarkus can be used in both JVM or Native image modes in Quarkus.

How to get it?

Currently the dependency must be manually added to your Quarkus application’s pom.xml as shown below. There are plans to make this extension available in the Quarkus extension catalogue as well as via Quarkus' Maven plugin in a future release.

<dependency>
  <groupId>io.debezium.quarkus</groupId>
  <artifactId>debezium-quarkus-outbox</artifactId>
  <version>1.1.0.Alpha1</version>
</dependency>

At the time of this blog, the extension was released as 1.1.0.Alpha1.
A newer version of the extension may be available, see Releases for details.

Using the extension

The Debezium Outbox extension uses the Observer pattern to monitor when the user application emits an object that implements the io.debezium.outbox.quarkus.ExportedEvent interface. This allows the Quarkus application behavior to be completely decoupled from that of the extension.

Lets walk through a simple example where a service is responsible for storing newly created orders and then emits an event that could be used to notify other interested services that an order has been created.

So to get started, we’ll begin by first implementing OrderCreatedEvent, an implementation of ExportedEvent. This event is used to signal when an Order has been saved by the OrderService.

public class OrderCreatedEvent implements ExportedEvent<String, JsonNode> {
    private final long orderId;
    private final JsonNode payload;
    private final Instant created;

    public OrderCreatedEvent(Instant createdAt, Order order) {
        this.orderId = order.getId();
        this.payload = convertOrderToJsonNode(order);
        this.created = createdAt;
    }

    @Override
    public String getAggregateId() {
        return String.valueOf(orderId);
    }

    @Override
    public String getAggregateType() {
        return "Order";
    }

    @Override
    public JsonNode getPayload() {
        return payload;
    }

    @Override
    public String getType() {
        return "OrderCreated";
    }

    @Override
    public Instant getTimestamp() {
        return created;
    }
}

The ExportedEvent interface is the contract that defines how a Quarkus application is to provide the extension with the data to persist to the outbox database table. This contract exposes several different values discussed below:

Aggregate Id

The aggregate id is used when emitting messages to Kafka as the message key to preserve message order. In this example, the OrderCreatedEvent returns the order identifier.

The ExportedEvent interface is parameterized and the first argument of the parameter argument list allows the application to specify the return data type for the aggregate id. While this example uses a String, the value returned can be any persistable object type.

Aggregate Type

The aggregate type is a string-based value that is used to append to the Kafka topic name and also assists in routing of the given message inside the Outbox Event Router SMT. In this example, we use Order and when using the default configuration of the SMT, messages would be found in the outbox.event.Order topic. Please see the route.topic.replacement in the SMT configuration options for more details.

Type

The message type is a string value that is emitted in the Kafka message’s envelope. In this example, the value in the message envelope would be OrderCreated.

Timestamp

By default, the Outbox Event Router SMT emits outbox events using the current timestamp when processing records but this may not always be sufficient for every use case. This field allows the source application to specify an Instant that can then be configured through the SMT configuration options to be used as the Kafka message timestamp instead.

Payload

The payload is the message content or value and is what is consumed by consumers of the Kafka topic.

The ExportedEvent interface is parameterized and the second argument of the parameter argument list allows the application to specify the return data type for the payload. While this example uses a JsonNode to store a JSON representation of the Order, the payload can be any persistable object type.

If multiple implementations of ExportedEvent exist in a Quarkus application, they must all use the same signature. If different signatures are required, the code should be split into different Quarkus applications because all ExportedEvent implementations will be stored in the same database outbox table for a given Quarkus application. We are currently investigating alternatives to loosen this restriction in a future release to allow multiple variants within the same application.

By itself, this OrderCreatedEvent does nothing on its own.

Next we want to implement an application component that is responsible for persisting the order to the database and then to emit the OrderCreatedEvent event. The OrderService class below uses JPA to persist the Order entity and then javax.enterprise.event.Event<T> to notify the outbox extension.

@ApplicationScoped
public class OrderService {
    @Inject
    EntityManager entityManager;

    @Inject
    Event<ExportedEvent<String, JsonNode>> event;

    @Transactional
    public Order addOrder(Order order) {
        entityManager.persist(order);
        event.fire(new OrderCreatedEvent(Instant.now(), order));
        return order;
    }
}

Before starting the application, certain configuration settings must be specified in application.properties. An example configuration might look like the following where we specify the database to connect to as well as how the persistence provider, Hibernate, is to operate.

quarkus.datasource.driver=org.postgresql.Driver
quarkus.datasource.url=jdbc:postgresql://order-db:5432/orderdb?currentSchema=orders
quarkus.datasource.username=user
quarkus.datasource.password=password
quarkus.hibernate-orm.database.generation=update
quarkus.hibernate-orm.dialect=org.hibernate.dialect.PostgreSQLDialect
quarkus.hibernate-orm.log.sql=true

By starting the application with this configuration the outbox table OutboxEvent will be created in the orders schema of the the order-db database with the following layout:

orderdb=# \d orders.outboxevent
                        Table "orders.outboxevent"
    Column     |            Type             | Collation | Nullable | Default
---------------+-----------------------------+-----------+----------+---------
 id            | uuid                        |           | not null |
 aggregatetype | character varying(255)      |           | not null |
 aggregateid   | character varying(255)      |           | not null |
 type          | character varying(255)      |           | not null |
 timestamp     | timestamp without time zone |           | not null |
 payload       | character varying(8000)     |           |          |
Indexes:
    "outboxevent_pkey" PRIMARY KEY, btree (id)

When using JsonNode as the payload return type, the extension uses a JPA attribute converter to store the contents as a string in the database.

Should the table or column names not fit your naming convention, they can be customized with several build-time configuration options. For example, if you wanted the table to be named outbox rather than outboxevent add the following line to the application.properties file:

quarkus.debezium-outbox.table-name=outbox

If you enabled SQL logging or check the row count of the outbox table, you might find it unusual that after saving the order that a record is inserted into the outbox table but then is immediately deleted. This is the default behavior since rows are not required to be retained for Debezium to pick up the change.

If row retention is required, this can be configured using a run-time configuration option. In order to enable row retention, add the following configuration to the application.properties file.

quarkus.debezium-outbox.remove-after-insert=false

Setting up the connector

Up to this point we’ve covered how to configure and use the extension in a Quarkus application to save events into the outbox database table. The last step is to configure the Debezium connector to monitor the outbox and emit those records to Kafka.

We’re going to use the following connector configuration:

{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "tasks.max": "1",
  "database.hostname": "order-db",
  "database.port": "5432",
  "database.user": "user",
  "database.password": "password",
  "database.dbname": "orderdb",
  "database.server.name": "dbserver1",
  "schema.whitelist" : "orders",
  "table.whitelist": "orders.outboxevent",
  "tombstones.on.delete": "false",
  "transforms": "outbox",
  "transforms.outbox.type" : "io.debezium.transforms.outbox.EventRouter",
  "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
  "transforms.outbox.table.field.event.timestamp": "timestamp",
  "transforms.outbox.table.fields.additional.placement": "type:header:eventType"
}

A vast majority of this is standard Debezium connector configuration, but what is important are the last several lines that begin with transforms. These are configuration options that are used by Kafka Connect to configure and call the Outbox Event Router SMT.

This configuration uses a custom route.topic.replacement configuration property. This setting will instead route OrderCreatedEvent rows from the outbox to the Order.events topic rather than the default outbox.events.Order topic.

This configuration also specifies the field.event.timestamp configuration property. This setting will instead populate the Kafka message time from the timestamp field in the outbox database table rather than the current timestamp when processing the row.

Please see Outbox Event Router Configuration Options for details on how to configure the SMT.

Once the connector is running, the Order.events topic will be populated with messages from the outbox table. The following JSON example represents an Order which gets saved by the OrderService.

{
    "customerId" : "123",
    "orderDate" : "2019-01-31T12:13:01",
    "lineItems" : [
        {
            "item" : "Debezium in Action",
            "quantity" : 2,
            "totalPrice" : 39.98
        },
        {
            "item" : "Debezium for Dummies",
            "quantity" : 1,
            "totalPrice" : 29.99
        }
    ]
}

When examining the Order.events topic, the event emitted will look like the following:

{
  "key": "1",
  "headers": "id=cc74eac7-176b-44e7-8bda-413a5088ca66,eventType=OrderCreated"
}
"{\"id\":1,\"customerId\":123,\"orderDate\":\"2019-01-31T12:13:01\",\"lineItems\":[{\"id\":1,\"item\":\"Debezium in Action\",\"quantity\":2,\"totalPrice\":39.98,\"status\":\"ENTERED\"},{\"id\":2,\"item\":\"Debezium for Dummies\",\"quantity\":1,\"totalPrice\":29.99,\"status\":\"ENTERED\"}]}"

Wrapping up

It is really simple and easy to setup and use the Debezium Outbox extension.

We have a complete example in our examples repository that uses the order service described here as well as a shipment service that consumes the events. For more details on the extension, refer to the Outbox Quarkus Extension documentation.

Future Plans

The current implementation of the Debezium Outbox extension works quite well, but we acknowledge there is still room for improvement. Some of the things we’ve already identified and have plans to include in future iterations of the extension are:

  • Avro serialization support for event payload

  • Full outbox table column attribute control, e.g. definition, length, precision, scale, and converters.

  • Complete outbox table customization using a user-supplied entity class.

  • Allow varied signatures of ExportedEvent within a single application.

We are currently tracking all future changes to this extension in DBZ-1711. As always we welcome any and all feedback, so feel free to let us know in that issue, on Gitter, or the mailing lists.

Chris Cranford

Chris is a software engineer at Red Hat. He previously was a member of the Hibernate ORM team and now works on Debezium. He lives in North Carolina just a few hours from Red Hat towers.

   

back to top