Outbox as in that folder in my email client? No, not exactly but there are some similarities!
The term outbox describes a pattern that allows independent components or services to perform read your own write semantics while concurrently providing a reliable, eventually consistent view to those writes across component or service boundaries.
You can read more about the Outbox pattern and how it applies to microservices in our blog post, Reliable Microservices Data Exchange With the Outbox Patttern.
So what exactly is an Outbox Event Router?
In Debezium version 0.9.3.Final, we introduced a ready-to-use Single Message Transform (SMT) that builds on the Outbox pattern to propagate data change events using Debezium and Kafka. Please see the documentation for details on how to use this transformation.
Going Supersonic with Quarkus!
Quarkus is a Kubernetes Native Java framework that is tailored for GraalVM and HotSpot using the best-of-breed Java technologies and standards. Quarkus aims to offer developers a unified reactive and imperative programming model to address a wide range of application architectures.
So what does all this mean exactly in laymen’s terms?
In short, the Debezium community can now leverage the Outbox pattern in a Quarkus-based application using a ready-to-use extension that works in parallel with your Debezium connector to emit change data events. The Debezium Outbox extension for Quarkus can be used in both JVM or Native image modes in Quarkus.
How to get it?
Currently the dependency must be manually added to your Quarkus application’s pom.xml
as shown below. There are plans to make this extension available in the Quarkus extension catalogue as well as via Quarkus' Maven plugin in a future release.
<dependency>
<groupId>io.debezium.quarkus</groupId>
<artifactId>debezium-quarkus-outbox</artifactId>
<version>1.1.0.Alpha1</version>
</dependency>
At the time of this blog, the extension was released as 1.1.0.Alpha1. |
Using the extension
The Debezium Outbox extension uses the Observer pattern to monitor when the user application emits an object that implements the io.debezium.outbox.quarkus.ExportedEvent
interface. This allows the Quarkus application behavior to be completely decoupled from that of the extension.
Lets walk through a simple example where a service is responsible for storing newly created orders and then emits an event that could be used to notify other interested services that an order has been created.
So to get started, we’ll begin by first implementing OrderCreatedEvent
, an implementation of ExportedEvent
. This event is used to signal when an Order
has been saved by the OrderService
.
public class OrderCreatedEvent implements ExportedEvent<String, JsonNode> {
private final long orderId;
private final JsonNode payload;
private final Instant created;
public OrderCreatedEvent(Instant createdAt, Order order) {
this.orderId = order.getId();
this.payload = convertOrderToJsonNode(order);
this.created = createdAt;
}
@Override
public String getAggregateId() {
return String.valueOf(orderId);
}
@Override
public String getAggregateType() {
return "Order";
}
@Override
public JsonNode getPayload() {
return payload;
}
@Override
public String getType() {
return "OrderCreated";
}
@Override
public Instant getTimestamp() {
return created;
}
}
The ExportedEvent
interface is the contract that defines how a Quarkus application is to provide the extension with the data to persist to the outbox database table. This contract exposes several different values discussed below:
Aggregate Id
The aggregate id is used when emitting messages to Kafka as the message key to preserve message order. In this example, the OrderCreatedEvent
returns the order identifier.
The |
Aggregate Type
The aggregate type is a string-based value that is used to append to the Kafka topic name and also assists in routing of the given message inside the Outbox Event Router SMT. In this example, we use Order
and when using the default configuration of the SMT, messages would be found in the outbox.event.Order
topic. Please see the route.topic.replacement
in the SMT configuration options for more details.
Type
The message type is a string value that is emitted in the Kafka message’s envelope. In this example, the value in the message envelope would be OrderCreated
.
Timestamp
By default, the Outbox Event Router SMT emits outbox events using the current timestamp when processing records but this may not always be sufficient for every use case. This field allows the source application to specify an Instant
that can then be configured through the SMT configuration options to be used as the Kafka message timestamp instead.
Payload
The payload is the message content or value and is what is consumed by consumers of the Kafka topic.
The |
If multiple implementations of |
By itself, this OrderCreatedEvent
does nothing on its own.
Next we want to implement an application component that is responsible for persisting the order to the database and then to emit the OrderCreatedEvent
event. The OrderService
class below uses JPA to persist the Order
entity and then javax.enterprise.event.Event<T>
to notify the outbox extension.
@ApplicationScoped
public class OrderService {
@Inject
EntityManager entityManager;
@Inject
Event<ExportedEvent<String, JsonNode>> event;
@Transactional
public Order addOrder(Order order) {
entityManager.persist(order);
event.fire(new OrderCreatedEvent(Instant.now(), order));
return order;
}
}
Before starting the application, certain configuration settings must be specified in application.properties
. An example configuration might look like the following where we specify the database to connect to as well as how the persistence provider, Hibernate, is to operate.
quarkus.datasource.driver=org.postgresql.Driver
quarkus.datasource.url=jdbc:postgresql://order-db:5432/orderdb?currentSchema=orders
quarkus.datasource.username=user
quarkus.datasource.password=password
quarkus.hibernate-orm.database.generation=update
quarkus.hibernate-orm.dialect=org.hibernate.dialect.PostgreSQLDialect
quarkus.hibernate-orm.log.sql=true
By starting the application with this configuration the outbox table OutboxEvent
will be created in the orders
schema of the the order-db
database with the following layout:
orderdb=# \d orders.outboxevent
Table "orders.outboxevent"
Column | Type | Collation | Nullable | Default
---------------+-----------------------------+-----------+----------+---------
id | uuid | | not null |
aggregatetype | character varying(255) | | not null |
aggregateid | character varying(255) | | not null |
type | character varying(255) | | not null |
timestamp | timestamp without time zone | | not null |
payload | character varying(8000) | | |
Indexes:
"outboxevent_pkey" PRIMARY KEY, btree (id)
When using |
Should the table or column names not fit your naming convention, they can be customized with several build-time configuration options. For example, if you wanted the table to be named outbox
rather than outboxevent
add the following line to the application.properties
file:
quarkus.debezium-outbox.table-name=outbox
If you enabled SQL logging or check the row count of the outbox table, you might find it unusual that after saving the order that a record is inserted into the outbox table but then is immediately deleted. This is the default behavior since rows are not required to be retained for Debezium to pick up the change.
If row retention is required, this can be configured using a run-time configuration option. In order to enable row retention, add the following configuration to the application.properties
file.
quarkus.debezium-outbox.remove-after-insert=false
Setting up the connector
Up to this point we’ve covered how to configure and use the extension in a Quarkus application to save events into the outbox database table. The last step is to configure the Debezium connector to monitor the outbox and emit those records to Kafka.
We’re going to use the following connector configuration:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "order-db",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname": "orderdb",
"database.server.name": "dbserver1",
"schema.whitelist" : "orders",
"table.whitelist": "orders.outboxevent",
"tombstones.on.delete": "false",
"transforms": "outbox",
"transforms.outbox.type" : "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.topic.replacement": "${routedByValue}.events",
"transforms.outbox.table.field.event.timestamp": "timestamp",
"transforms.outbox.table.fields.additional.placement": "type:header:eventType"
}
A vast majority of this is standard Debezium connector configuration, but what is important are the last several lines that begin with transforms. These are configuration options that are used by Kafka Connect to configure and call the Outbox Event Router SMT.
This configuration uses a custom This configuration also specifies the Please see Outbox Event Router Configuration Options for details on how to configure the SMT. |
Once the connector is running, the Order.events
topic will be populated with messages from the outbox table. The following JSON example represents an Order
which gets saved by the OrderService
.
{
"customerId" : "123",
"orderDate" : "2019-01-31T12:13:01",
"lineItems" : [
{
"item" : "Debezium in Action",
"quantity" : 2,
"totalPrice" : 39.98
},
{
"item" : "Debezium for Dummies",
"quantity" : 1,
"totalPrice" : 29.99
}
]
}
When examining the Order.events
topic, the event emitted will look like the following:
{
"key": "1",
"headers": "id=cc74eac7-176b-44e7-8bda-413a5088ca66,eventType=OrderCreated"
}
"{\"id\":1,\"customerId\":123,\"orderDate\":\"2019-01-31T12:13:01\",\"lineItems\":[{\"id\":1,\"item\":\"Debezium in Action\",\"quantity\":2,\"totalPrice\":39.98,\"status\":\"ENTERED\"},{\"id\":2,\"item\":\"Debezium for Dummies\",\"quantity\":1,\"totalPrice\":29.99,\"status\":\"ENTERED\"}]}"
Wrapping up
It is really simple and easy to setup and use the Debezium Outbox extension.
We have a complete example in our examples repository that uses the order service described here as well as a shipment service that consumes the events. For more details on the extension, refer to the Outbox Quarkus Extension documentation.
Future Plans
The current implementation of the Debezium Outbox extension works quite well, but we acknowledge there is still room for improvement. Some of the things we’ve already identified and have plans to include in future iterations of the extension are:
-
Avro serialization support for event payload
-
Full outbox table column attribute control, e.g. definition, length, precision, scale, and converters.
-
Complete outbox table customization using a user-supplied entity class.
-
Allow varied signatures of
ExportedEvent
within a single application.
We are currently tracking all future changes to this extension in DBZ-1711. As always we welcome any and all feedback, so feel free to let us know in that issue, on Gitter, or the mailing lists.
About Debezium
Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.
Get involved
We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.