Subscribe


Tutorial for Using Debezium Connectors With Apache Pulsar

This is a guest post by Apache Pulsar PMC Member and Committer Jia Zhai.

Debezium is an open source project for change data capture (CDC). It is built on Apache Kafka Connect and supports multiple databases, such as MySQL, MongoDB, PostgreSQL, Oracle, and SQL Server. Apache Pulsar includes a set of built-in connectors based on Pulsar IO framework, which is counter part to Apache Kafka Connect.

As of version 2.3.0, Pulsar IO comes with support for the Debezium source connectors out of the box, so you can leverage Debezium to stream changes from your databases into Apache Pulsar. This tutorial walks you through setting up the Debezium connector for MySQL with Pulsar IO.

Tutorial Steps

This tutorial is similar to the Debezium tutorial, except that storage of event streams is changed from Kafka to Pulsar. It mainly includes six steps:

  1. Start a MySQL server;

  2. Start standalone Pulsar service;

  3. Start the Debezium connector in Pulsar IO. Pulsar IO reads database changes existing in MySQL server;

  4. Subscribe Pulsar topics to monitor MySQL changes;

  5. Make changes in MySQL server, and verify that changes are recorded in Pulsar topics immediately;

  6. Clean up.

Step 1: Start a MySQL server

Start a MySQL server that contains a database example, from which Debezium captures changes. Open a new terminal to start a new container that runs a MySQL database server pre-configured with a database named inventory:

docker run --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9

The following information is displayed:

2019-03-25T14:12:41.178325Z 0 [Note] Event Scheduler: Loaded 0 events
2019-03-25T14:12:41.178670Z 0 [Note] mysqld: ready for connections.
Version: '5.7.25-log'  socket: '/var/run/mysqld/mysqld.sock'  port: 3306  MySQL Community Server (GPL)

Step 2: Start standalone Pulsar service

Start Pulsar service locally in standalone mode. Support for running Debezium connectors in Pulsar IO is introduced in Pulsar 2.3.0. Download Pulsar binary of 2.3.0 release and pulsar-io-kafka-connect-adaptor-2.3.0.nar of 2.3.0 release. In Pulsar, all Pulsar IO connectors are packaged as separate NAR files.

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar
$ tar zxf apache-pulsar-2.3.0-bin.tar.gz
$ cd apache-pulsar-2.3.0
$ mkdir connectors
$ cp ../pulsar-io-kafka-connect-adaptor-2.3.0.nar connectors
$ bin/pulsar standalone
start pulsar standalone]

Step 3: Start the Debezium MySQL connector in Pulsar IO

Start the Debezium MySQL connector in Pulsar IO, with local run mode, in another terminal tab. The “debezium-mysql-source-config.yaml” file contains all the configuration, and main parameters are listed under the “configs” node. The .yaml file contains the "task.class" parameter. The configuration file also includes MySQL related parameters (like server, port, user, password) and two names of Pulsar topics for "history" and "offset" storage.

$ bin/pulsar-admin source localrun  --sourceConfigFile debezium-mysql-source-config.yaml

The content in the “debezium-mysql-source-config.yaml” file is as follows.

tenant: "test"
namespace: "test-namespace"
name: "debezium-kafka-source"
topicName: "kafka-connect-topic"
archive: "connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar"

parallelism: 1

configs:
  ## sourceTask
  task.class: "io.debezium.connector.mysql.MySqlConnectorTask"

  ## config for mysql, docker image: debezium/example-mysql:0.8
  database.hostname: "localhost"
  database.port: "3306"
  database.user: "debezium"
  database.password: "dbz"
  database.server.id: "184054"
  database.server.name: "dbserver1"
  database.whitelist: "inventory"

  database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
  database.history.pulsar.topic: "history-topic"
  database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
  ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
  key.converter: "org.apache.kafka.connect.json.JsonConverter"
  value.converter: "org.apache.kafka.connect.json.JsonConverter"
  ## PULSAR_SERVICE_URL_CONFIG
  pulsar.service.url: "pulsar://127.0.0.1:6650"
  ## OFFSET_STORAGE_TOPIC_CONFIG
  offset.storage.topic: "offset-topic"

Tables are created automatically in the aforementioned MySQL server. So Debezium connector reads history records from MySQL binlog file from the beginning. In the output you will find the connector has already been triggered and processed in 47 records.

connector start process records

For more information on how to manage connectors, see the Pulsar IO documentation.

Records that have been captured and read by Debezium are automatically published to Pulsar topics. When you start a new terminal, you will find the current topics in Pulsar with the following command:

$ bin/pulsar-admin topics list public/default
list Pulsar topics

For each table, which has been changed, the change data is stored in a separate Pulsar topic. Except database table related topics, another two topics named “history-topic” and “offset-topic” are used to store history and offset related data.

persistent://public/default/history-topic
persistent://public/default/offset-topic

Step 4: Subscribe Pulsar topics to monitor MySQL changes

Take the persistent://public/default/dbserver1.inventory.products topic as an example. Use the CLI command to consume this topic and monitor changes while the “products” table changes.

 $ bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0

The output is as follows:

…
22:17:41.201 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribing to topic on cnx [id: 0xfe0b4feb, L:/127.0.0.1:55585 - R:localhost/127.0.0.1:6650]
22:17:41.223 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0

You can also consume the offset topic to monitor the offset changes while the table changes are stored in the persistent://public/default/dbserver1.inventory.products Pulsar topic.

$ bin/pulsar-client consume -s "sub-offset" offset-topic -n 0

Step 5: Make changes in MySQL server, and verify that changes are recorded in Pulsar topics immediately

Start a MySQL CLI docker connector, and you can make changes to the “products” table in MySQL server.

$docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

After running the command, MySQL CLI is displayed, and you can change the names of the two items in the “products” table.

mysql> use inventory;
mysql> show tables;
mysql> SELECT * FROM  products ;
mysql> UPDATE products SET name='1111111111' WHERE id=101;
mysql> UPDATE products SET name='1111111111' WHERE id=107;
mysql updates

In the terminal where you consume products topic, you find that two changes have been added.

table topic stores mysql updates

In the terminal where you consume the offset topic, you find that two offsets have been added.

offset topic get updated

In the terminal where you local-run the connector, you find two more records have been processed.

table topic get more records

Step 6: Clean up.

Use “Ctrl + C” to close terminals. Use “docker ps” and “docker kill” to stop MySQL related containers.

mysql> quit

$ docker ps
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                               NAMES
84d66c2f591d        debezium/example-mysql:0.8   "docker-entrypoint.s…"   About an hour ago   Up About an hour    0.0.0.0:3306->3306/tcp, 33060/tcp   mysql

$ docker kill 84d66c2f591d

To delete Pulsar data, delete data directory in the Pulsar binary directory.

$ pwd
/Users/jia/ws/releases/apache-pulsar-2.3.0

$ rm -rf data

Conclusion

The Pulsar IO framework allows to run the Debezium connectors for change data capture, streaming data changes from different databases into Apache Pulsar. In this tutorial you’ve learned how to capture data changes in a MySQL database and propagate them to Pulsar. We are improving support for running the Debezium connectors with Apache Pulsar continuously, it will be much easier to use after Pulsar 2.4.0 release.


Debezium meets Quarkus

Last week’s announcement of Quarkus sparked a great amount of interest in the Java community: crafted from the best of breed Java libraries and standards, it allows to build Kubernetes-native applications based on GraalVM & OpenJDK HotSpot. In this blog post we are going to demonstrate how a Quarkus-based microservice can consume Debezium’s data change events via Apache Kafka. For that purpose, we’ll see what it takes to convert the shipment microservice from our recent post about the outbox pattern into Quarkus-based service.

Quarkus is a Java stack designed for the development of cloud-native applications based on the Java platform. It combines and tightly integrates mature libraries such Hibernate ORM, Vert.x, Netty, RESTEasy and Apache Camel as well as the APIs from the Eclipse MicroProfile initiative, such as Config or Reactive Messaging. Using Quarkus, you can develop applications using both imperative and reactive styles, also combining both approaches as needed.

It is designed for significantly reduced memory consumption and improved startup time. Last but not least, Quarkus supports both OpenJDK HotSpot and GraalVM virtual machines. With GraalVM it is possible to compile the application into a native binary and thus reduce the resource consumption and startup time even more.

To learn more about Quarkus itself, we recommend to take a look at its excellent Getting Started guide.

Consuming Kafka Messages with Quarkus

In the original example application demonstrating the outbox pattern, there was a microservice ("shipment") based on Thorntail that consumed the events produced by the Debezium connector. We’ve extended the example with a new service named "shipment-service-quarkus". It provides the same functionality as the "shipment-service" but is implemented as a microservice based on Quarkus instead of Thorntail.

This makes the overall architecture look like so:

Outbox Pattern Overview

To retrofit the original service into a Quarkus-based application, only a few changes were needed:

  • Quarkus right now supports only MariaDB but not MySQL; hence we have included an instance of MariaDB to which the service is writing

  • The JSON-P API used do deserialize incoming JSON messages can currently not be used without RESTEasy (see issue #1480, which should be fixed soon); so the code has been modified to use the Jackson API instead

  • Instead of the Kafka consumer API, the Reactive Messaging API defined by MicroProfile is used to receive messages from Apache Kafka; as an implementation of that API, the one provided by the SmallRye project is used, which is bundled as a Quarkus extension

While the first two steps are mere technicalities, the Reactive Messaging API is a nice simplification over the polling loop in the original consumer. All that’s needed to consume messages from a Kafka topic is to annotate a method with @Incoming, and it will automatically be invoked when a new message arrives:

@ApplicationScoped
public class KafkaEventConsumer {

    @Incoming("orders")
    public CompletionStage<Void> onMessage(KafkaMessage<String, String> message)
            throws IOException {
        // handle message...

        return message.ack();
    }
}

The "orders" message source is configured via the MicroProfile Config API, which resolves it to the "OrderEvents" topic already known from the original outbox example.

Build Process

The build process is mostly the same as it was before. Instead of using the Thorntail Maven plug-in, the Quarkus Maven plug-in is used now.

The following Quarkus extensions are used:

  • io.quarkus:quarkus-hibernate-orm: support for Hibernate ORM and JPA

  • io.quarkus:quarkus-jdbc-mariadb: support for accessing MariaDB through JDBC

  • io.quarkus:quarkus-smallrye-reactive-messaging-kafka: support for accessing Kafka through the MicroProfile Reactive Messaging API

They pull in some other extensions too, e.g. quarkus-arc (the Quarkus CDI runtime) and quarkus-vertx (used by the reactive messaging support).

In addition, two more changes were needed:

  • A new build profile named native has been added; this is used to compile the service into a native binary image using the Quarkus Maven plug-in

  • the native-image.docker-build system property is enabled when running the build; this means that the native image build is done inside of a Docker container, so that GraalVM doesn’t have to be installed on the developer’s machine

All the heavy-lifting is done by the Quarkus Maven plug-in which is configured in pom.xml like so:

  <build>
    <finalName>shipment</finalName>
    <plugins>
      ...
      <plugin>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-maven-plugin</artifactId>
        <version>${version.quarkus}</version>
        <executions>
          <execution>
            <goals>
              <goal>build</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  ...
    <profile>
      <id>native</id>
      <build>
        <plugins>
          <plugin>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-maven-plugin</artifactId>
            <version>${version.quarkus}</version>
            <executions>
              <execution>
                <goals>
                  <goal>native-image</goal>
                </goals>
                <configuration>
                  <enableHttpUrlHandler>true</enableHttpUrlHandler>
                  <autoServiceLoaderRegistration>false</autoServiceLoaderRegistration>
                </configuration>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    </profile>

Configuration

As any Quarkus application, the shipment service is configured via the application.properties file:

quarkus.datasource.url: jdbc:mariadb://shipment-db-quarkus:3306/shipmentdb
quarkus.datasource.driver: org.mariadb.jdbc.Driver
quarkus.datasource.username: mariadbuser
quarkus.datasource.password: mariadbpw
quarkus.hibernate-orm.database.generation=drop-and-create
quarkus.hibernate-orm.log.sql=true

smallrye.messaging.source.orders.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.source.orders.topic=OrderEvents
smallrye.messaging.source.orders.bootstrap.servers=kafka:9092
smallrye.messaging.source.orders.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders.group.id=shipment-service-quarkus

In our case it contains

  • the definition of a datasource (based on MariaDB) to which the shipment service writes its data,

  • the definition of a messaging source, which is backed by the "OrderEvents" Kafka topic, using the given bootstrap server, deserializers and Kafka consumer group id.

Execution

The Docker Compose config file has been enriched with two services, MariaDB and the new Quarkus-based shipment service. So when docker-compose up is executed, two shipment services are started side-by-side: the original Thorntail-based one and the new one using Quarkus. When the order services receives a new purchase order and exports a corresponding event to Apache Kafka via the outbox table, that message is processed by both shipment services, as they are using distinct consumer group ids.

Performance Numbers

The numbers are definitely not scientific, but provide a good indication of the order-of-magnitude difference between the native Quarkus-based application and the Thorntail service running on the JVM:

Quarkus service Thorntail service

application package size [MB]

54

131

memory [MB]

33.8

1257

start time [ms]

260

5746

The memory data were obtained via htop utility. The startup time was measured till the message about application readiness was printed. As with all performance measurements, you should run your own comparisons based on your set-up and workload to gain insight into the actual differences for your specific use cases.

Summary

In this post we have successfully demonstrated that it is possible to consume Debezium-generated events in a Java application written with the Quarkus Java stack. We have also shown that it is possible to provide such application as a binary image and provided back-of-the-envelope performance numbers demonstrating significant savings in resources.

If you’d like to see the awesomeness of deploying Java microservices as native images by yourself, you can find the complete source code of the implementation in the Debezium examples repo. If you got any questions or feedback, please let us know in the comments below; looking forward to hearing from you!

Many thanks to Guillaume Smet for reviewing an earlier version of this post!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Reliable Microservices Data Exchange With the Outbox Pattern

As part of their business logic, microservices often do not only have to update their own local data store, but they also need to notify other services about data changes that happened. The outbox pattern describes an approach for letting services execute these two tasks in a safe and consistent manner; it provides source services with instant "read your own writes" semantics, while offering reliable, eventually consistent data exchange across service boundaries.

If you’ve built a couple of microservices, you’ll probably agree that the hardest part about them is data: microservices don’t exist in isolation and very often they need to propagate data and data changes amongst each other.

For instance consider a microservice that manages purchase orders: when a new order is placed, information about that order may have to be relayed to a shipment service (so it can assemble shipments of one or more orders) and a customer service (so it can update things like the customer’s total credit balance based on the new order).

There are different approaches for letting the order service know the other two about new purchase orders; e.g. it could invoke some REST, grpc or other (synchronous) API provided by these services. This might create some undesired coupling, though: the sending service must know which other services to invoke and where to find them. It also must be prepared for these services temporarily not being available. Service meshes such as Istio can come in helpful here, by providing capabilities like request routing, retries, circuit breakers and much more.

The general issue of any synchronous approach is that one service cannot really function without the other services which it invokes. While buffering and retrying might help in cases where other services only need to be notified of certain events, this is not the case if a service actually needs to query other services for information. For instance, when a purchase order is placed, the order service might need to obtain the information how many times the purchased item is on stock from an inventory service.

Another downside of such a synchronous approach is that it lacks re-playability, i.e. the possibility for new consumers to arrive after events have been sent and still be able to consume the entire event stream from the beginning.

Both problems can be addressed by using an asynchronous data exchange approach instead: i.e having the order, inventory and other services propagate events through a durable message log such as Apache Kafka. By subscribing to these event streams, each service will be notified about the data change of other services. It can react to these events and, if needed, create a local representation of that data in its own data store, using a representation tailored towards its own needs. For instance, such view might be denormalized to efficiently support specific access patterns, or it may only contain a subset of the original data that’s relevant to the consuming service.

Durable logs also support re-playability, i.e. new consumers can be added as needed, enabling use cases you might not have had in mind originally, and without touching the source service. E.g. consider a data warehouse which should keep information about all the orders ever placed, or some full-text search functionality on purchase orders based on Elasticsearch. Once the purchase order events are in a Kafka topic (Kafka’s topic’s retention policy settings can be used to ensure that events remain in a topic as long as its needed for the given use cases and business requirements), new consumers can subscribe, process the topic from the very beginning and materialize a view of all the data in a microservice’s database, search index, data warehouse etc.

Dealing with Topic Growth

Depending on the amount of data (number and size of records, frequency of changes), it may or may not be feasible to keep events in topics for a long or even indefinite time. Very often, some or even all events pertaining to a given data item (e.g. a specific purchase order) might be eligible for deletion from a business point of view after a given point in time. See the box "Deletion of Events from Kafka Topics" further below for some more thoughts on the deletion of events from Kafka topics in order to keep their size within bounds.

The Issue of Dual Writes

In order to provide their functionality, microservices will typically have their own local data store. For instance, the order service may use a relational database to persist the information about purchase orders. When a new order is placed, this may result in an INSERT operation in a table PurchaseOrder in the service’s database. At the same time, the service may wish to send an event about the new order to Apache Kafka, so to propagate that information to other interested services.

Simply issuing these two requests may lead to potential inconsistencies, though. The reason being that we cannot have one shared transaction that would span the service’s database as well as Apache Kafka, as the latter doesn’t support to be enlisted in distributed (XA) transactions. So in unfortunate circumstances it might happen that we end up with having the new purchase order persisted in the local database, but not having sent the corresponding message to Kafka (e.g. due to some networking issue). Or, the other way around, we might have sent the message to Kafka but failed to persist the purchase order in the local database. Both situations are undesirable; this may cause no shipment to be created for a seemingly successfully placed order. Or a shipment gets created, but then there’d be no trace about the corresponding purchase order in the order service itself.

So how can this situation be avoided? The answer is to only modify one of the two resources (the database or Apache Kafka) and drive the update of the second one based on that, in an eventually consistent manner. Let’s first consider the case of only writing to Apache Kafka.

When receiving a new purchase order, the order service would not do the INSERT into its database synchronously; instead, it would only send an event describing the new order to a Kafka topic. So only one resource gets modified at a time, and if something goes wrong with that, we’ll find out about it instantly and report back to the caller of the order service that the request failed.

At the same time, the service itself would subscribe to that Kafka topic. That way, it will be notified when a new message arrives in the topic and it can persist the new purchase order in its database. There’s one subtle challenge here, though, and that is the lack of "read your own write" semantics. E.g. let’s assume the order service also has an API for searching for all the purchase orders of a given customer. When invoking that API right after placing a new order, due to the asynchronous nature of processing messages from the Kafka topic, it might happen that the purchase order has not yet been persisted in the service’s database and thus will not be returned by that query. That can lead to a very confusing user experience, as users for instance may miss newly placed orders in their shopping history. There are ways to deal with this situation, e.g. the service could keep newly placed purchase orders in memory and answer subsequent queries based on that. This gets quickly non-trivial though when implementing more complex queries or considering that the order service might also comprise multiple nodes in a clustered set-up, which would require propagation of that data within the cluster.

Now how would things look like when only writing to the database synchronously and driving the export of a message to Apache Kafka based on that? This is where the outbox pattern comes in.

The Outbox Pattern

The idea of this approach is to have an "outbox" table in the service’s database. When receiving a request for placing a purchase order, not only an INSERT into the PurchaseOrder table is done, but, as part of the same transaction, also a record representing the event to be sent is inserted into that outbox table.

The record describes an event that happened in the service, for instance it could be a JSON structure representing the fact that a new purchase order has been placed, comprising data on the order itself, its order lines as well as contextual information such as a use case identifier. By explicitly emitting events via records in the outbox table, it can be ensured that events are structured in a way suitable for external consumers. This also helps to make sure that event consumers won’t break when for instance altering the internal domain model or the PurchaseOrder table.

An asynchronous process monitors that table for new entries. If there are any, it propagates the events as messages to Apache Kafka. This gives us a very nice balance of characteristics: By synchronously writing to the PurchaseOrder table, the source service benefits from "read your own writes" semantics. A subsequent query for purchase orders will return the newly persisted order, as soon as that first transaction has been committed. At the same time, we get reliable, asynchronous, eventually consistent data propagation to other services via Apache Kafka.

Now, the outbox pattern isn’t actually a new idea. It has been in use for quite some time. In fact, even when using JMS-style message brokers, which actually could participate in distributed transactions, it can be a preferable option to avoid any coupling and potential impact by downtimes of remote resources such as a message broker. You can also find a description of the pattern on Chris Richardson’s excellent microservices.io site.

Nevertheless, the pattern gets much less attention than it deserves and it is especially useful in the context of microservices. As we’ll see, the outbox pattern can be implemented in a very elegant and efficient way using change data capture and Debezium. In the following, let’s explore how.

An Implementation Based on Change Data Capture

Log-based Change Data Capture (CDC) is a great fit for capturing new entries in the outbox table and stream them to Apache Kafka. As opposed to any polling-based approach, event capture happens with a very low overhead in near-realtime. Debezium comes with CDC connectors for several databases such as MySQL, Postgres and SQL Server. The following example will use the Debezium connector for Postgres.

You can find the complete source code of the example on GitHub. Refer to the README.md for details on building and running the example code. The example is centered around two microservices, order-service and shipment-service. Both are implemented in Java, using CDI as the component model and JPA/Hibernate for accessing their respective databases. The order service runs on WildFly and exposes a simple REST API for placing purchase orders and canceling specific order lines. It uses a Postgres database as its local data store. The shipment service is based on Thorntail; via Apache Kafka, it receives events exported by the order service and creates corresponding shipment entries in its own MySQL database. Debezium tails the transaction log ("write-ahead log", WAL) of the order service’s Postgres database in order to capture any new events in the outbox table and propagates them to Apache Kafka.

The overall architecture of the solution can be seen in the following picture:

Outbox Pattern Overview

Note that the pattern is in no way tied to these specific implementation choices. It could equally well be realized using alternative technologies such as Spring Boot (e.g. leveraging Spring Data’s support for domain events), plain JDBC or other programming languages than Java altogether.

Now let’s take a closer look at some of the relevant components of the solution.

The Outbox Table

The outbox table resides in the database of the order service and has the following structure:

Column        |          Type          | Modifiers
--------------+------------------------+-----------
id            | uuid                   | not null
aggregatetype | character varying(255) | not null
aggregateid   | character varying(255) | not null
type          | character varying(255) | not null
payload       | jsonb                  | not null

Its columns are these:

  • id: unique id of each message; can be used by consumers to detect any duplicate events, e.g. when restarting to read messages after a failure. Generated when creating a new event.

  • aggregatetype: the type of the aggregate root to which a given event is related; the idea being, leaning on the same concept of domain-driven design, that exported events should refer to an aggregate ("a cluster of domain objects that can be treated as a single unit"), where the aggregate root provides the sole entry point for accessing any of the entities within the aggregate. This could for instance be "purchase order" or "customer".

    This value will be used to route events to corresponding topics in Kafka, so there’d be a topic for all events related to purchase orders, one topic for all customer-related events etc. Note that also events pertaining to a child entity contained within one such aggregate should use that same type. So e.g. an event representing the cancelation of an individual order line (which is part of the purchase order aggregate) should also use the type of its aggregate root, "order", ensuring that also this event will go into the "order" Kafka topic.

  • aggregateid: the id of the aggregate root that is affected by a given event; this could for instance be the id of a purchase order or a customer id; Similar to the aggregate type, events pertaining to a sub-entity contained within an aggregate should use the id of the containing aggregate root, e.g. the purchase order id for an order line cancelation event. This id will be used as the key for Kafka messages later on. That way, all events pertaining to one aggregate root or any of its contained sub-entities will go into the same partition of that Kafka topic, which ensures that consumers of that topic will consume all the events related to one and the same aggregate in the exact order as they were produced.

  • type: the type of event, e.g. "Order Created" or "Order Line Canceled". Allows consumers to trigger suitable event handlers.

  • payload: a JSON structure with the actual event contents, e.g. containing a purchase order, information about the purchaser, contained order lines, their price etc.

Sending Events to the Outbox

In order to "send" events to the outbox, code in the order service could in general just do an INSERT into the outbox table. However, it’s a good idea to go for a slightly more abstract API, allowing to adjust implementation details of the outbox later on more easily, if needed. CDI events come in very handy for this. They can be raised in the application code and will be processed synchronously by the outbox event sender, which will do the required INSERT into the outbox table.

All outbox event types should implement the following contract, resembling the structure of the outbox table shown before:

public interface ExportedEvent {

    String getAggregateId();
    String getAggregateType();
    JsonNode getPayload();
    String getType();
}

To produce such event, application code uses an injected Event instance, as e.g. here in the OrderService class:

@ApplicationScoped
public class OrderService {

    @PersistenceContext
    private EntityManager entityManager;

    @Inject
    private Event<ExportedEvent> event;

    @Transactional
    public PurchaseOrder addOrder(PurchaseOrder order) {
        order = entityManager.merge(order);

        event.fire(OrderCreatedEvent.of(order));
        event.fire(InvoiceCreatedEvent.of(order));

        return order;
    }

    @Transactional
    public PurchaseOrder updateOrderLine(long orderId, long orderLineId,
            OrderLineStatus newStatus) {
        // ...
    }
}

In the addOrder() method, the JPA entity manager is used to persist the incoming order in the database and the injected event is used to fire a corresponding OrderCreatedEvent and an InvoiceCreatedEvent. Again, keep in mind that, despite the notion of "event", these two things happen within one and the same transaction. i.e. within this transaction, three records will be inserted into the database: one in the table with purchase orders and two in the outbox table.

Actual event implementations are straight-forward; as an example, here’s the OrderCreatedEvent class:

public class OrderCreatedEvent implements ExportedEvent {

    private static ObjectMapper mapper = new ObjectMapper();

    private final long id;
    private final JsonNode order;

    private OrderCreatedEvent(long id, JsonNode order) {
        this.id = id;
        this.order = order;
    }

    public static OrderCreatedEvent of(PurchaseOrder order) {
        ObjectNode asJson = mapper.createObjectNode()
                .put("id", order.getId())
                .put("customerId", order.getCustomerId())
                .put("orderDate", order.getOrderDate().toString());

        ArrayNode items = asJson.putArray("lineItems");

        for (OrderLine orderLine : order.getLineItems()) {
        items.add(
                mapper.createObjectNode()
                .put("id", orderLine.getId())
                .put("item", orderLine.getItem())
                .put("quantity", orderLine.getQuantity())
                .put("totalPrice", orderLine.getTotalPrice())
                .put("status", orderLine.getStatus().name())
            );
        }

        return new OrderCreatedEvent(order.getId(), asJson);
    }

    @Override
    public String getAggregateId() {
        return String.valueOf(id);
    }

    @Override
    public String getAggregateType() {
        return "Order";
    }

    @Override
    public String getType() {
        return "OrderCreated";
    }

    @Override
    public JsonNode getPayload() {
        return order;
    }
}

Note how Jackson’s ObjectMapper is used to create a JSON representation of the event’s payload.

Now let’s take a look at the code that consumes any fired ExportedEvent and does the corresponding write to the outbox table:

@ApplicationScoped
public class EventSender {

    @PersistenceContext
    private EntityManager entityManager;

    public void onExportedEvent(@Observes ExportedEvent event) {
        OutboxEvent outboxEvent = new OutboxEvent(
                event.getAggregateType(),
                event.getAggregateId(),
                event.getType(),
                event.getPayload()
        );

        entityManager.persist(outboxEvent);
        entityManager.remove(outboxEvent);
    }
}

It’s rather simple: for each event the CDI runtime will invoke the onExportedEvent() method. An instance of the OutboxEvent entity is persisted in the database — and removed right away!

This might be surprising at first. But it makes sense when remembering how log-based CDC works: it doesn’t examine the actual contents of the table in the database, but instead it tails the append-only transaction log. The calls to persist() and remove() will create an INSERT and a DELETE entry in the log once the transaction commits. After that, Debezium will process these events: for any INSERT, a message with the event’s payload will be sent to Apache Kafka. DELETE events on the other hand can be ignored, as the removal from the outbox table is a mere technicality that doesn’t require any propagation to the message broker. So we are able to capture the event added to the outbox table by means of CDC, but when looking at the contents of the table itself, it will always be empty. This means that no additional disk space is needed for the table (apart from the log file elements which will automatically be discarded at some point) and also no separate house-keeping process is required to stop it from growing indefinitely.

Registering the Debezium Connector

With the outbox implementation in place, it’s time to register the Debezium Postgres connector, so it can capture any new events in the outbox table and relay them to Apache Kafka. That can be done by POST-ing the following JSON request to the REST API of Kafka Connect:

{
    "name": "outbox-connector",
    "config": {
        "connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max" : "1",
        "database.hostname" : "order-db",
        "database.port" : "5432",
        "database.user" : "postgresuser",
        "database.password" : "postgrespw",
        "database.dbname" : "orderdb",
        "database.server.name" : "dbserver1",
        "schema.whitelist" : "inventory",
        "table.whitelist" : "inventory.outboxevent",
        "tombstones.on.delete" : "false",
        "transforms" : "router",
        "transforms.router.type" : "io.debezium.examples.outbox.routingsmt.EventRouter"
    }
}

This sets up an instance of io.debezium.connector.postgresql.PostgresConnector, capturing changes from the specified Postgres instance. Note that by means of a table whitelist, only changes from the outboxevent table are captured. It also applies a single message transform (SMT) named EventRouter.

Deletion of Events from Kafka Topics

By setting the tombstones.on.delete to false, no deletion markers ("tombstones") will be emitted by the connector when an event record gets deleted from the outbox table. That makes sense, as the deletion from the outbox table shouldn’t affect the retention of events in the corresponding Kafka topics. Instead, a specific retention time for the event topics may be configured in Kafka, e.g. to retain all purchase order events for 30 days.

Alternatively, one could work with compacted topics. This would require some changes to the design of events in the outbox table:

  • they must describe the entire aggregate; so for instance also an event representing the cancelation of a single order line should describe the complete current state of the containing purchase order; that way consumers will be able to obtain the entire state of the purchase order also when only seeing the last event pertaining to a given order, after log compaction ran.

  • they must have one more boolean attribute indicating whether a particular event represents the deletion of the event’s aggregate root. Such an event (e.g. of type OrderDeleted) could then be used by the event routing SMT described in the next section to produce a deletion marker for that aggregate root. Log compaction would then remove all events pertaining to the given purchase order when its OrderDeleted event has been written to the topic.

Naturally, when deleting events, the event stream will not be re-playable from its very beginning any longer. Depending on the specific business requirements, it might be sufficient to just keep the final state of a given purchase order, customer etc. This could be achieved using compacted topics and a sufficiently value for the topic’s delete.retention.ms setting. Another option could be to move historic events to some sort of cold storage (e.g. an Amazon S3 bucket), from where they can be retrieved if needed, followed by reading the latest events from the Kafka topics. Which approach to follow depends on the specific requirements, expected amount of data and expertise in the team developing and operating the solution.

Topic Routing

By default, the Debezium connectors will send all change events originating from one given table to the same topic, i.e. we’d end up with a single Kafka topic named dbserver1.inventory.outboxevent which would contain all events, be it order events, customer events etc.

To simplify the implementation of consumers which are only interested in specific event types it makes more sense, though, to have multiple topics, e.g. OrderEvents, CustomerEvents and so on. For instance the shipment service might not be interested in any customer events. By only subscribing to the OrderEvents topic, it will be sure to never receive any customer events.

In order to route the change events captured from the outbox table to different topics, that custom SMT EventRouter is used. Here is the code of its apply() method, which will be invoked by Kafka Connect for each record emitted by the Debezium connector:

@Override
public R apply(R record) {
    // Ignoring tombstones just in case
    if (record.value() == null) {
        return record;
    }

    Struct struct = (Struct) record.value();
    String op = struct.getString("op");

    // ignoring deletions in the outbox table
    if (op.equals("d")) {
        return null;
    }
    else if (op.equals("c")) {
        Long timestamp = struct.getInt64("ts_ms");
        Struct after = struct.getStruct("after");

        String key = after.getString("aggregateid");
        String topic = after.getString("aggregatetype") + "Events";

        String eventId = after.getString("id");
        String eventType = after.getString("type");
        String payload = after.getString("payload");

        Schema valueSchema = SchemaBuilder.struct()
            .field("eventType", after.schema().field("type").schema())
            .field("ts_ms", struct.schema().field("ts_ms").schema())
            .field("payload", after.schema().field("payload").schema())
            .build();

        Struct value = new Struct(valueSchema)
            .put("eventType", eventType)
            .put("ts_ms", timestamp)
            .put("payload", payload);

        Headers headers = record.headers();
        headers.addString("eventId", eventId);

        return record.newRecord(topic, null, Schema.STRING_SCHEMA, key, valueSchema, value,
                record.timestamp(), headers);
    }
    // not expecting update events, as the outbox table is "append only",
    // i.e. event records will never be updated
    else {
        throw new IllegalArgumentException("Record of unexpected op type: " + record);
    }
}

When receiving a delete event (op = d), it will discard that event, as that deletion of event records from the outbox table is not relevant to downstream consumers. Things get more interesting, when receiving a create event (op = c). Such record will be propagated to Apache Kafka.

Debezium’s change events have a complex structure, that contain the old (before) and new (after) state of the represented row. The event structure to propagate is obtained from the after state. The aggregatetype value from the captured event record is used to build the name of the topic to send the event to. For instance, events with aggregatetype set to Order will be sent to the OrderEvents topic. aggregateid is used as the message key, making sure all messages of that aggregate will go into the same partition of that topic. The message value is a structure comprising the original event payload (encoded as JSON), the timestamp indicating when the event was produced and the event type. Finally, the event UUID is propagated as a Kafka header field. This allows for efficient duplicate detection by consumers, without having to examine the actual message contents.

Events in Apache Kafka

Now let’s take a look into the OrderEvents and CustomerEvents topics.

If you have checked out the example sources and started all the components via Docker Compose (see the README.md file in the example project for more details), you can place purchase orders via the order service’s REST API like so:

cat resources/data/create-order-request.json | http POST http://localhost:8080/order-service/rest/orders

Similarly, specific order lines can be canceled:

cat resources/data/cancel-order-line-request.json | http PUT http://localhost:8080/order-service/rest/orders/1/lines/2

When using a tool such as the very practical kafkacat utility, you should now see messages like these in the OrderEvents topic:

kafkacat -b kafka:9092 -C -o beginning -f 'Headers: %h\nKey: %k\nValue: %s\n' -q -t OrderEvents
Headers: eventId=d03dfb18-8af8-464d-890b-09eb8b2dbbdd
Key: "4"
Value: {"eventType":"OrderCreated","ts_ms":1550307598558,"payload":"{\"id\": 4, \"lineItems\": [{\"id\": 7, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 8, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"}
Headers: eventId=49f89ea0-b344-421f-b66f-c635d212f72c
Key: "4"
Value: {"eventType":"OrderLineUpdated","ts_ms":1550308226963,"payload":"{\"orderId\": 4, \"newStatus\": \"CANCELLED\", \"oldStatus\": \"ENTERED\", \"orderLineId\": 7}"}

The payload field with the message values is the string-ified JSON representation of the original events. The Debezium Postgres connector emits JSONB columns as a string (using the io.debezium.data.Json logical type name), which is why the quotes are escaped. The jq utility, and more specifically, its fromjson operator, come in handy for displaying the event payload in a more readable way:

kafkacat -b kafka:9092 -C -o beginning -t Order | jq '.payload | fromjson'
{
  "id": 4,
  "lineItems": [
    {
      "id": 7,
      "item": "Debezium in Action",
      "status": "ENTERED",
      "quantity": 2,
      "totalPrice": 39.98
    },
    {
      "id": 8,
      "item": "Debezium for Dummies",
      "status": "ENTERED",
      "quantity": 1,
      "totalPrice": 29.99
    }
  ],
  "orderDate": "2019-01-31T12:13:01",
  "customerId": 123
}
{
  "orderId": 4,
  "newStatus": "CANCELLED",
  "oldStatus": "ENTERED",
  "orderLineId": 7
}

You can also take a look at the CustomerEvents topic to inspect the events representing the creation of an invoice when a purchase order is added.

Duplicate Detection in the Consuming Service

At this point, our implementation of the outbox pattern is fully functional; when the order service receives a request to place an order (or cancel an order line), it will persist the corresponding state in the purchaseorder and orderline tables of its database. At the same time, within the same transaction, corresponding event entries will be added to the outbox table in the same database. The Debezium Postgres connector captures any insertions into that table and routes the events into the Kafka topic corresponding to the aggregate type represented by a given event.

To wrap things up, let’s explore how another microservice such as the shipment service can consume these messages. The entry point into that service is a regular Kafka consumer implementation, which is not too exciting and hence omitted here for the sake of brevity. You can find its source code in the example repository. For each incoming message on the Order topic, the consumer calls the OrderEventHandler:

@ApplicationScoped
public class OrderEventHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderEventHandler.class);

    @Inject
    private MessageLog log;

    @Inject
    private ShipmentService shipmentService;

    @Transactional
    public void onOrderEvent(UUID eventId, String key, String event) {
        if (log.alreadyProcessed(eventId)) {
            LOGGER.info("Event with UUID {} was already retrieved, ignoring it", eventId);
            return;
        }

        JsonObject json = Json.createReader(new StringReader(event)).readObject();
        JsonObject payload = json.containsKey("schema") ? json.getJsonObject("payload") :json;

        String eventType = payload.getString("eventType");
        Long ts = payload.getJsonNumber("ts_ms").longValue();
        String eventPayload = payload.getString("payload");

        JsonReader payloadReader = Json.createReader(new StringReader(eventPayload));
        JsonObject payloadObject = payloadReader.readObject();

        if (eventType.equals("OrderCreated")) {
            shipmentService.orderCreated(payloadObject);
        }
        else if (eventType.equals("OrderLineUpdated")) {
            shipmentService.orderLineUpdated(payloadObject);
        }
        else {
            LOGGER.warn("Unkown event type");
        }

        log.processed(eventId);
    }
}

The first thing done by onOrderEvent() is to check whether the event with the given UUID has been processed before. If so, any further calls for that same event will be ignored. This is to prevent any duplicate processing of events caused by the "at least once" semantics of this data pipeline. For instance it could happen that the Debezium connector or the consuming service fail before acknowledging the retrieval of a specific event with the source database or the messaging broker, respectively. In that case, after a restart of Debezium or the consuming service, a few events may be processed a second time. Propagating the event UUID as a Kafka message header allows for an efficient detection and exclusion of duplicates in the consumer.

If a message is received for the first time, the message value is parsed and the business method of the ShippingService method corresponding to the specific event type is invoked with the event payload. Finally, the message is marked as processed with the message log.

This MessageLog simply keeps track of all consumed events in a table within the service’s local database:

@ApplicationScoped
public class MessageLog {

    @PersistenceContext
    private EntityManager entityManager;

    @Transactional(value=TxType.MANDATORY)
    public void processed(UUID eventId) {
        entityManager.persist(new ConsumedMessage(eventId, Instant.now()));
    }

    @Transactional(value=TxType.MANDATORY)
    public boolean alreadyProcessed(UUID eventId) {
        return entityManager.find(ConsumedMessage.class, eventId) != null;
    }
}

That way, should the transaction be rolled back for some reason, also the original message will not be marked as processed and an exception would bubble up to the Kafka event consumer loop. This allows for re-trying to process the message later on.

Note that a more complete implementation should take care of re-trying given messages only for a certain number of times, before re-routing any unprocessable messages to a dead-letter queue or similar. Also there should be some house-keeping on the message log table; periodically, all events older than the consumer’s current offset committed with the broker may be deleted, as it’s ensured that such messages won’t be propagated to the consumer another time.

Summary

The outbox pattern is a great way for propagating data amongst different microservices.

By only modifying a single resource - the source service’s own database - it avoids any potential inconsistencies of altering multiple resources at the same time which don’t share one common transactional context (the database and Apache Kafka). By writing to the database first, the source service has instant "read your own writes" semantics, which is important for a consistent user experience, allowing query methods invoked following to a write to instantly reflect any data changes.

At the same time, the pattern enables asynchronous event propagation to other microservices. Apache Kafka acts as a highly scalable and reliable backbone for the messaging amongst the services. Given the right topic retention settings, new consumers may come up long after an event has been originally produced, and build up their own local state based on the event history.

Putting Apache Kafka into the center of the overall architecture also ensures a decoupling of involved services. If for instance single components of the solution fail or are not available for some time, e.g. during an update, events will simply be processed later on: after a restart, the Debezium connector will continue to tail the outbox table from the point where it left off before. Similarly, any consumer will continue to process topics from its previous offset. By keeping track of already successfully processed messages, duplicates can be detected and excluded from repeated handling.

Naturally, such event pipeline between different services is eventually consistent, i.e. consumers such as the shipping service may lag a bit behind producers such as the order service. Usually, that’s just fine, though, and can be handled in terms of the application’s business logic. For instance there’ll typically be no need to create a shipment within the very same second as an order has been placed. Also, end-to-end delays of the overall solution are typically low (seconds or even sub-second range), thanks to log-based change data capture which allows for emission of events in near-realtime.

One last thing to keep in mind is that the structure of the events exposed via the outbox should be considered a part of the emitting service’s API. I.e. when needed, their structure should be adjusted carefully and with compatibility considerations in mind. This is to ensure to not accidentally break any consumers when upgrading the producing service. At the same time, consumers should be lenient when handling messages and for instance not fail when encountering unknown attributes within received events.

Many thanks to Hans-Peter Grahsl, Jiri Pechanec, Justin Holmes and René Kerner for their feedback while writing this post!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Automating Cache Invalidation With Change Data Capture

The second-level cache of Hibernate ORM / JPA is a proven and efficient way to increase application performance: caching read-only or rarely modified entities avoids roundtrips to the database, resulting in improved response times of the application.

Unlike the first-level cache, the second-level cache is associated with the session factory (or entity manager factory in JPA terms), so its contents are shared across transactions and concurrent sessions. Naturally, if a cached entity gets modified, the corresponding cache entry must be updated (or purged from the cache), too. As long as the data changes are done through Hibernate ORM, this is nothing to worry about: the ORM will update the cache automatically.

Things get tricky, though, when bypassing the application, e.g. when modifying records directly in the database. Hibernate ORM then has no way of knowing that the cached data has become stale, and it’s necessary to invalidate the affected items explicitly. A common way for doing so is to foresee some admin functionality that allows to clear an application’s caches. For this to work, it’s vital to not forget about calling that invalidation functionality, or the application will keep working with outdated cached data.

In the following we’re going to explore an alternative approach for cache invalidation, which works in a reliable and fully automated way: by employing Debezium and its change data capture (CDC) capabilities, you can track data changes in the database itself and react to any applied change. This allows to invalidate affected cache entries in near-realtime, without the risk of stale data due to missed changes. If an entry has been evicted from the cache, Hibernate ORM will load the latest version of the entity from the database the next time is requested.

The Example Application

As an example, consider this simple model of two entities, PurchaseOrder and Item:

Example domain model

A purchase order represents the order of an item, where its total price is the ordered quantity times the item’s base price.

Source Code

The source code of this example is provided on GitHub. If you want to follow along and try out all the steps described in the following, clone the repo and follow the instructions in README.md for building the project.

Modelling order and item as JPA entities is straight-forward:

@Entity
public class PurchaseOrder {

    @Id
    @GeneratedValue(generator = "sequence")
    @SequenceGenerator(
        name = "sequence", sequenceName = "seq_po", initialValue = 1001, allocationSize = 50
    )
    private long id;
    private String customer;
    @ManyToOne private Item item;
    private int quantity;
    private BigDecimal totalPrice;

    // ...
}

As changes to items are rare, the Item entity should be cached. This can be done by simply specifying JPA’s @Cacheable annotation:

@Entity
@Cacheable
public class Item {

    @Id
    private long id;
    private String description;
    private BigDecimal price;

    // ...
}

You also need to enable the second-level cache in the META-INF/persistence.xml file. The property hibernate.cache.use_second_level_cache activates the cache itself, and the ENABLE_SELECTIVE cache mode causes only those entities to be put into the cache which are annotated with @Cacheable. It’s also a good idea to enable SQL query logging and cache access statistics. That way you’ll be able to verify whether things work as expected by examining the application log:

<?xml version="1.0" encoding="utf-8"?>
<persistence xmlns="http://xmlns.jcp.org/xml/ns/persistence"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="..."
    version="2.2">

    <persistence-unit name="orders-PU-JTA" transaction-type="JTA">
        <jta-data-source>java:jboss/datasources/OrderDS</jta-data-source>
        <shared-cache-mode>ENABLE_SELECTIVE</shared-cache-mode>
        <properties>
            <property name="hibernate.cache.use_second_level_cache" value="true" />

            <property name="hibernate.show_sql" value="true" />
            <property name="hibernate.format_sql" value="true" />
            <property name="hibernate.generate_statistics" value="true" />

            <!-- dialect etc. ... -->
        </properties>
    </persistence-unit>
</persistence>

When running on a Java EE application server (or Jakarta EE how the stack is called after it has been donated to the Eclipse Foundation), that’s all you need to enable second-level caching. In the case of WildFly (which is what’s used in the example project), the Infinispan key/value store is used as the cache provider by default.

Now try and see what happens when modifying an item’s price by running some SQL in the database, bypassing the application layer. If you’ve checked out the example source code, comment out the DatabaseChangeEventListener class and start the application as described in the README.md. You then can place purchase orders using curl like this (a couple of example items have been persisted at application start-up):

> curl -H "Content-Type: application/json" \
  -X POST \
  --data '{ "customer" : "Billy-Bob", "itemId" : 10003, "quantity" : 2 }' \
  http://localhost:8080/cache-invalidation/rest/orders
{
    "id" : 1002,
    "customer" : "Billy-Bob",
    "item" : {
        "id" :10003,
        "description" : "North By Northwest",
        "price" : 14.99
    },
    "quantity" : 2,
    "totalPrice" : 29.98
}

The response is the expected one, as the item price is 14.99. Now update the item’s price directly in the database. The example uses Postgres, so you can use the psql CLI utility to do so:

docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "UPDATE item SET price = 20.99 where id = 10003"'

Placing another purchase order for the same item using curl, you’ll see that the calculated total price doesn’t reflect the update. Not good! But it’s not too surprising, given that the price update was applied completely bypassing the application layer and Hibernate ORM.

The Change Event Handler

Now let’s explore how to use Debezium and CDC to react to changes in the item table and invalidate corresponding cache entries.

While Debezium most of the times is deployed into Kafka Connect (thus streaming change events into Apache Kafka topics), it has another mode of operation that comes in very handy for the use case at hand. Using the embedded engine, you can run the Debezium connectors as a library directly within your application. For each change event received from the database, a configured callback method will be invoked, which in the case at hand will evict the affected item from the second-level cache.

The following picture shows the design of this approach:

Architecture Overview

While this doesn’t come with the scalability and fault tolerance provided by Apache Kafka, it nicely fits the given requirements. As the second-level cache is bound to the application lifecycle, there is for instance no need for the offset management and restarting capabilities provided by the Kafka Connect framework. For the given use case it is enough to receive data change events while the application is running, and using the embedded engine enables exactly that.

Clustered Applications

Note that it still might make sense to use Apache Kafka and the regular deployment of Debezium into Kafka Connect when running a clustered application where each node has a local cache. Instead of registering a connector on each node, Kafka and Connect would allow you to deploy a single connector instance and have the application nodes listen to the topic(s) with the change events. This would result in less resource utilization in the database.

Having added the dependencies of the Debezium embedded engine (io.debezium:debezium-embedded:0.9.0.Beta1) and the Debezium Postgres connector (io.debezium:debezium-connector-postgres:0.9.0.Beta1) to your project, a class DatabaseChangeEventListener for listening to any changes in the database can be implemented like this:

@ApplicationScoped
public class DatabaseChangeEventListener {

    @Resource
    private ManagedExecutorService executorService;

    @PersistenceUnit private EntityManagerFactory emf;

    @PersistenceContext
    private EntityManager em;

    private EmbeddedEngine engine;

    public void startEmbeddedEngine(@Observes @Initialized(ApplicationScoped.class) Object init) {
        Configuration config = Configuration.empty()
                .withSystemProperties(Function.identity()).edit()
                .with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class)
                .with(EmbeddedEngine.ENGINE_NAME, "cache-invalidation-engine")
                .with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class)
                .with("name", "cache-invalidation-connector")
                .with("database.hostname", "postgres")
                .with("database.port", 5432)
                .with("database.user", "postgresuser")
                .with("database.password", "postgrespw")
                .with("database.server.name", "dbserver1")
                .with("database.dbname", "inventory")
                .with("database.whitelist", "public")
                .with("snapshot.mode", "never")
                .build();

        this.engine = EmbeddedEngine.create()
                .using(config)
                .notifying(this::handleDbChangeEvent)
                .build();

        executorService.execute(engine);
    }

    @PreDestroy
    public void shutdownEngine() {
        engine.stop();
    }

    private void handleDbChangeEvent(SourceRecord record) {
        if (record.topic().equals("dbserver1.public.item")) {
            Long itemId = ((Struct) record.key()).getInt64("id");
            Struct payload = (Struct) record.value();
            Operation op = Operation.forCode(payload.getString("op"));

            if (op == Operation.UPDATE || op == Operation.DELETE) {
                emf.getCache().evict(Item.class, itemId);
            }
        }
    }
}

Upon application start-up, this configures an instance of the Debezium Postgres connector and sets up the embedded engine for running the connector. The connector options (host name, credentials etc.) are mostly the same as when deploying the connector into Kafka Connect. There is no need for doing an initial snapshot of the existing data, hence the snapshot mode is set to "never".

The offset storage option is used for controlling how connector offsets should be persisted. As it’s not necessary to process any change events occurring while the connector is not running (instead you’d just begin to read the log from the current location after the restart), the in-memory implementation provided by Kafka Connect is used.

Once configured, the embedded engine must be run via an Executor instance. As the example runs in WildFly, a managed executor can simply be obtained through @Resource injection for that purpose (see JSR 236).

The embedded engine is configured to invoke the handleDbChangeEvent() method for each received data change event. In this method it first is checked whether the incoming event originates from the item table. If that’s the case, and if the change event represents an UPDATE or DELETE statement, the affected Item instance is evicted from the second-level cache. JPA 2.0 provides a simple API for this purpose which is accessible via the EntityManagerFactory.

With the DatabaseChangeEventListener class in place, the cache entry will now automatically be evicted when doing another item update via psql. When placing the first purchase order for that item after the update, you’ll see in the application log how Hibernate ORM executes a query SELECT ... FROM item ... in order to load the item referenced by the order. Also the cache statistics will report one "L2C miss". Upon subsequent orders of that same item it will be obtained from the cache again.

Eventual Consistency

While the event handling happens in near-realtime, it’s important to point out that it still applies eventual consistency semantics. This means that there is a very short time window between the point in time where a transaction is committed and the point in time where the change event is streamed from the log to the event handler and the cache entry is invalidated.

Avoiding Cache Invalidations After Application-triggered Data Changes

The change event listener shown above satisfies the requirement of invalidating cached items after external data changes. But in its current form it is evicting cache items a bit too aggressively: cached items will also be purged when updating an Item instance through the application itself. This is not only not needed (as the cached item already is the current version), but it’s even counter-productive: the superfluous cache evictions will cause additional database roundtrips, resulting in longer response times.

It is therefore necessary to distinguish between data changes performed by the application itself and external data changes. Only in the latter case the affected items should be evicted from the cache. In order to do so, you can leverage the fact that each Debezium data change event contains the id of the originating transaction. Keeping track of all transactions run by the application itself allows to trigger the cache eviction only for those items altered by external transactions.

Accounting for this change, the overall architecture looks like so:

Architecture Overview with Transaction Registry

The first thing to implement is the transaction registry, i.e. a class for the transaction book keeping:

@ApplicationScoped
public class KnownTransactions {

    private final DefaultCacheManager cacheManager;
    private final Cache<Long, Boolean> applicationTransactions;

    public KnownTransactions() {
        cacheManager = new DefaultCacheManager();
        cacheManager.defineConfiguration(
                "tx-id-cache",
                new ConfigurationBuilder()
                    .expiration()
                        .lifespan(60, TimeUnit.SECONDS)
                    .build()
                );

        applicationTransactions = cacheManager.getCache("tx-id-cache");
    }

    @PreDestroy
    public void stopCacheManager() {
        cacheManager.stop();
    }

    public void register(long txId) {
        applicationTransactions.put(txId, true);
    }

    public boolean isKnown(long txId) {
        return Boolean.TRUE.equals(applicationTransactions.get(txId));
    }
}

This uses the Infinispan DefaultCacheManager for creating and maintaining an in-memory cache of transaction ids encountered by the application. As data change events arrive in near-realtime, the TTL of the cache entries can be rather short (in fact, the value of one minute shown in the example is chosen very conservatively, usually events should be received within seconds).

The next step is to retrieve the current transaction id whenever a request is processed by the application and register it within KnownTransactions. This should happen once per transaction. There are multiple ways for implementing this logic; in the following a Hibernate ORM FlushEventListener is used for this purpose:

class TransactionRegistrationListener implements FlushEventListener {

    private volatile KnownTransactions knownTransactions;

    public TransactionRegistrationListener() {
    }

    @Override
    public void onFlush(FlushEvent event) throws HibernateException {
        event.getSession().getActionQueue().registerProcess( session -> {
            Number txId = (Number) event.getSession().createNativeQuery("SELECT txid_current()")
                    .setFlushMode(FlushMode.MANUAL)
                    .getSingleResult();

            getKnownTransactions().register(txId.longValue());
        } );
    }

    private  KnownTransactions getKnownTransactions() {
        KnownTransactions value = knownTransactions;

        if (value == null) {
            knownTransactions = value = CDI.current().select(KnownTransactions.class).get();
        }

        return value;
    }
}

As there’s no portable way to obtain the transaction id, this is done using a native SQL query. In the case of Postgres, the txid_current() function can be called for that. Hibernate ORM event listeners are not subject to dependency injection via CDI. Hence the static current() method is used to obtain a handle to the application’s CDI container and get a reference to the KnownTransactions bean.

This listener will be invoked whenever Hibernate ORM is synchronizing its persistence context with the database ("flushing"), which usually happens exactly once when the transaction is committed.

Manual Flushes

The session / entity manager can also be flushed manually, in which case the txid_current() function would be invoked multiple times. That’s neglected here for the sake of simplicity. The actual code in the example repo contains a slightly extended version of this class which makes sure that the transaction id is obtained only once.

To register the flush listener with Hibernate ORM, an Integrator implementation must be created and declared in the META-INF/services/org.hibernate.integrator.spi.Integrator file:

public class TransactionRegistrationIntegrator implements Integrator {

    @Override
    public void integrate(Metadata metadata, SessionFactoryImplementor sessionFactory,
            SessionFactoryServiceRegistry serviceRegistry) {
        serviceRegistry.getService(EventListenerRegistry.class)
            .appendListeners(EventType.FLUSH, new TransactionRegistrationListener());
    }

    @Override
    public void disintegrate(SessionFactoryImplementor sessionFactory,
            SessionFactoryServiceRegistry serviceRegistry) {
    }
}
io.debezium.examples.cacheinvalidation.persistence.TransactionRegistrationIntegrator

During bootstrap, Hibernate ORM will detect the integrator class (by means of the Java service loader), invoke its integrate() method which in turn will register the listener class for the FLUSH event.

The last step is to exclude any events stemming from transactions run by the application itself in the database change event handler:

@ApplicationScoped
public class DatabaseChangeEventListener {

    // ...

    @Inject
    private KnownTransactions knownTransactions;

    private void handleDbChangeEvent(SourceRecord record) {
        if (record.topic().equals("dbserver1.public.item")) {
            Long itemId = ((Struct) record.key()).getInt64("id");
            Struct payload = (Struct) record.value();
            Operation op = Operation.forCode(payload.getString("op"));
            Long txId = ((Struct) payload.get("source")).getInt64("txId");

            if (!knownTransactions.isKnown(txId) &&
                    (op == Operation.UPDATE || op == Operation.DELETE)) {
                emf.getCache().evict(Item.class, itemId);
            }
        }
    }
}

And with that, you got all the pieces in place: cached Items will only be evicted after external data changes, but not after changes done by the application itself. To confirm, you can invoke the example’s items resource using curl:

> curl -H "Content-Type: application/json" \
  -X PUT \
  --data '{ "description" : "North by Northwest", "price" : 20.99}' \
  http://localhost:8080/cache-invalidation/rest/items/10003

When placing the next order for the item after this update, you should see that the Item entity is obtained from the cache, i.e. the change event will not have caused the item’s cache entry to be evicted. In contrast, if you update the item’s price via psql another time, the item should be removed from the cache and the order request will produce a cache miss, followed by a SELECT against the item table in the database.

Summary

In this blog post we’ve explored how Debezium and change data capture can be employed to invalidate application-level caches after external data changes. Compared to manual cache invalidation, this approach works very reliably (by capturing changes directly from the database log, no events will be missed) and fast (cache eviction happens in near-realtime after the data changes).

As you have seen, not too much glue code is needed in order to implement this. While the shown implementation is somewhat specific to the entities of the example, it should be possible to implement the change event handler in a more generic fashion, so that it can handle a set of configured entity types (essentially, the database change listener would have to convert the primary key field(s) from the change events into the primary key type of the corresponding entities in a generic way). Also such generic implementation would have to provide the logic for obtaining the current transaction id for the most commonly used databases.

Please let us know whether you think this would be an interesting extension to have for Debezium and Hibernate ORM. For instance this could be a new module under the Debezium umbrella, and it could also be a very great project to work on, should you be interested in contributing to Debezium. If you got any thoughts on this idea, please post a comment below or come to our mailing list.

Many thanks to Guillaume Smet, Hans-Peter Grahsl and Jiri Pechanec for their feedback while writing this post!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Materializing Aggregate Views With Hibernate and Debezium

Updating external full text search indexes (e.g. Elasticsearch) after data changes is a very popular use case for change data capture (CDC).

As we’ve discussed in a blog post a while ago, the combination of Debezium’s CDC source connectors and Confluent’s sink connector for Elasticsearch makes it straight forward to capture data changes in MySQL, Postgres etc. and push them towards Elastisearch in near real-time. This results in a 1:1 relationship between tables in the source database and a corresponding search index in Elasticsearch, which is perfectly fine for many use cases.

It gets more challenging though if you’d like to put entire aggregates into a single index. An example could be a customer and all their addresses; those would typically be stored in two separate tables in an RDBMS, linked by a foreign key, whereas you’d like to have just one index in Elasticsearch, containing documents of customers with their addresses embedded, allowing you to efficiently search for customers based on their address.

Following up to the KStreams-based solution to this we described recently, we’d like to present in this post an alternative for materializing such aggregate views driven by the application layer.

Overview

The idea is to materialize views in a separate table in the source database, right in the moment the original data is altered.

Aggregates are serialized as JSON structures (which naturally can represent any nested object structure) and stored in a specific table. This is done within the actual transaction altering the data, which means the aggregate view is always consistent with the primary data. In particular this approach isn’t prone to exposing intermediary aggregations as the KStreams-based solution discussed in the post linked above.

The following picture shows the overall architecture:

Streaming Materialized Aggregate Views to Elastisearch

Here the aggregate views are materialized by means of a small extension to Hibernate ORM, which stores the JSON aggregates within the source database (note "aggregate views" can be considered conceptually the same as "materialized views" as known from different RDBMS, as in that they materialize the result of a "join" operation, but technically we’re not using the latter to store aggregate views, but a regular table). Changes to that aggregate table are then captured by Debezium and streamed to one topic per aggregate type. The Elasticsearch sink connector can subscribe to these topics and update corresponding full-text indexes.

You can find a proof-of-concept implementation (said Hibernate extension and related code) of this idea in our examples repository. Of course the general idea isn’t limited to Hibernate ORM or JPA, you could implement something similar with any other API you’re using to access your data.

Creating Aggregate Views via Hibernate ORM

For the following let’s assume we’re persisting a simple domain model (comprising a Customer entity and a few related ones like Address, (customer) Category etc.) in a database. Using Hibernate for that allows us to make the creation of aggregates fully transparent to the actual application code using a Hibernate event listener. Thanks to its extensible architecture, we can plug such listener into Hibernate just by adding it to the classpath, from where it will be picked up automatically when bootstrapping the entity manager / session factory.

Our example listener reacts to an annotation, @MaterializeAggregate, which marks those entity types that should be the roots of materialized aggregates.

@Entity
@MaterializeAggregate(aggregateName="customers-complete")
public class Customer {

    @Id
    private long id;

    private String firstName;

    @OneToMany(mappedBy = "customer", fetch = FetchType.EAGER, cascade = CascadeType.ALL)
    private Set<Address> addresses;

    @ManyToOne
    private Category category;

    ...
}

Now if any entity annotated with @MaterializeAggregate is inserted, updated or deleted via Hibernate, the listener will kick in and materialize a JSON view of the aggregate root (customer) and its associated entities (addresses, category).

Under the hood the Jackson API is used for serializing the model into JSON. This means you can use any of its annotations to customize the JSON output, e.g. @JsonIgnore to exclude the inverse relationship from Address to Customer:

@Entity
public class Address {

    @Id
    private long id;

    @ManyToOne
    @JoinColumn(name = "customer_id")
    @JsonIgnore
    private Customer customer;

    private String street;

    private String city;

    ...
}

Note that Address itself isn’t marked with @MaterializeAggregate, i.e. it won’t be materialized into an aggregate view by itself.

After using JPA’s EntityManager to insert or update a few customers, let’s take a look at the aggregates table which has been populated by the listener (value schema omitted for the sake of brevity):

> select * from aggregates;

| rootType | keySchema | rootId | materialization | valueSchema |

| customers-complete

| {
  "schema" : {
    "type" : "struct",
    "fields" : [ {
      "type" : "int64",
      "optional" : false,
      "field" : "id"
    } ],
    "optional" : false,
    "name" : "customers-complete.Key"
  }
}

| { "id" : 1004 }

| { "schema" : { ... } }

| {
  "id" : 1004,
  "firstName" : "Anne",
  "lastName" : "Kretchmar",
  "email" : "annek@noanswer.org",
  "tags" : [ "long-term", "vip" ],
  "birthday" : 5098,
  "category" : {
    "id" : 100001,
    "name" : "Retail"
  },
  "addresses" : [ {
    "id" : 16,
    "street" : "1289 University Hill Road",
    "city" : "Canehill",
    "state" : "Arkansas",
    "zip" : "72717",
    "type" : "SHIPPING"
  } ]
} |

The table contains these columns:

  • rootType: The name of the aggregate as given in the @MaterializeAggregate annotation

  • rootId: The aggregate’s id as serialized JSON

  • materialization: The aggregate itself as serialized JSON; in this case a customer and their addresses, category etc.

  • keySchema: The Kafka Connect schema of the row’s key

  • valueSchema: The Kafka Connect schema of the materialization

Let’s talk about the two schema columns for a bit. JSON itself is quite limited as far as its supported data types are concerned. So for instance we’d loose information about a numeric field’s value range (int vs. long etc.) without any additional information. Therefore the listener derives the corresponding schema information for key and aggregate view from the entity model and stores it within the aggregate records.

Now Jackson itself only supports JSON Schema, which would be a bit too limited for our purposes. Hence the example implementation provides custom serializers for Jackson’s schema system, which allow us to emit Kafka Connect’s schema representation (with more precise type information) instead of plain JSON Schema. This will come in handy in the following when we’d like to expand the string-based JSON representations of key and value into properly typed Kafka Connect records.

Capturing Changes to the Aggregate Table

We now have a mechanism in place which transparently persists aggregates into a separate table within the source database, whenever the application data is changed through Hibernate. Note that this happens within the boundaries of the source transaction, so if the same would be rolled back for some reason, also the aggregate view would not be updated.

The Hibernate listener uses insert-or-update semantics when writing an aggregate view, i.e. for a given aggregate root there’ll always be exactly one corresponding entry in the aggregate table which reflects its current state. If an aggregate root entity is deleted, the listener will also drop the entry from the aggregate table.

So let’s set up Debezium now to capture any changes to the aggregates table:

curl -i -X POST \
  -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  http://localhost:8083/connectors/ -d @- <<-EOF
  {
      "name": "inventory-connector",
      "config": {
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "tasks.max": "1",
          "database.hostname": "mysql",
          "database.port": "3306",
          "database.user": "debezium",
          "database.password": "dbz",
          "database.server.id": "184054",
          "database.server.name": "dbserver1",
          "database.whitelist": "inventory",
          "table.whitelist": ".*aggregates",
          "database.history.kafka.bootstrap.servers": "kafka:9092",
          "database.history.kafka.topic": "schema-changes.inventory"
      }
  }
EOF

This registers the MySQL connector with the "inventory" database (we’re using an expanded version of the schema from the Debezium tutorial), capturing any changes to the "aggregates" table.

Expanding JSON

If we now were to browse the corresponding Kafka topic, we’d see data change events in the known Debezium format for all the changes to the aggregates table.

The "materialization" field with the records' "after" state still is a single field containing a JSON string, though. What we’d rather like to have is a strongly typed Kafka Connect record, whose schema exactly describes the aggregate structure and the types of its fields. For that purpose the example project provides an SMT (single message transform) which takes the JSON materialization and the corresponding valueSchema and converts this into a full-blown Kafka Connect record. The same is done for keys. DELETE events are rewritten into tombstone events. Finally, the SMT re-routes every record to a topic named after the aggregate root, allowing consumers to subscribe just to changes to specific aggregate types.

So let’s add that SMT when registering the Debezium CDC connector:

...
"transforms":"expandjson",
"transforms.expandjson.type":"io.debezium.aggregation.smt.ExpandJsonSmt",
...

When now browsing the "customers-complete" topic, we’ll see the strongly typed Kafka Connect records we’d expect:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int64",
                "optional": false,
                "field": "id"
            }
        ],
        "optional": false,
        "name": "customers-complete.Key"
    },
    "payload": {
        "id": 1004
    }
}
{
    "schema": {
        "type": "struct",
        "fields": [ ... ],
        "optional": true,
        "name": "urn:jsonschema:com:example:domain:Customer"
    },
    "payload": {
        "id": 1004,
        "firstName": "Anne",
        "lastName": "Kretchmar",
        "email": "annek@noanswer.org",
        "active": true,
        "tags" : [ "long-term", "vip" ],
        "birthday" : 5098,
        "category": {
            "id": 100001,
            "name": "Retail"
        },
        "addresses": [
            {
                "id": 16,
                "street": "1289 University Hill Road",
                "city": "Canehill",
                "state": "Arkansas",
                "zip": "72717",
                "type": "LIVING"
            }
        ]
    }
}

To confirm that these are actual typed Kafka Connect records and not just a single JSON string field, you could for instance use the Avro message converter and examine the message schemas in the schema registry.

Sinking Aggregate Messages Into Elasticsearch

The last missing step is to register the Confluent Elasticsearch sink connector, hooking it up with the "customers-complete" topic and letting it push any changes to the corresponding index:

curl -i -X POST \
  -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  http://localhost:8083/connectors/ -d @- <<-EOF
  {
      "name": "es-customers",
      "config": {
          "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
          "tasks.max": "1",
          "topics": "customers-complete",
          "connection.url": "http://elastic:9200",
          "key.ignore": "false",
          "schema.ignore" : "false",
          "behavior.on.null.values" : "delete",
          "type.name": "customer-with-addresses",
          "transforms" : "key",
          "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
          "transforms.key.field": "id"
      }
  }
EOF

This uses Connect’s ExtractField transformation to obtain just the actual id value from the key struct and use it as key for the corresponding Elasticsearch documents. Specifying the "behavior.on.null.values" option will let the connector delete the corresponding document from the index when encountering a tombstone message (i.e. a message with a key but without value).

Finally, we can use the Elasticsearch REST API to browse the index and of course use its powerful full-text query language to find customers by the address or any other property embedded into the aggregate structure:

> curl -X GET -H "Accept:application/json" \
  http://localhost:9200/customers-complete/_search?pretty

  {
      "_shards": {
          "failed": 0,
          "successful": 5,
          "total": 5
      },
      "hits": {
          "hits": [
              {
                  "_id": "1004",
                  "_index": "customers-complete",
                  "_score": 1.0,
                  "_source": {
                      "active": true,
                      "addresses": [
                          {
                              "city": "Canehill",
                              "id": 16,
                              "state": "Arkansas",
                              "street": "1289 University Hill Road",
                              "type": "LIVING",
                              "zip": "72717"
                          }
                      ],
                      "tags" : [ "long-term", "vip" ],
                      "birthday" : 5098,
                      "category": {
                          "id": 100001,
                          "name": "Retail"
                      },
                      "email": "annek@noanswer.org",
                      "firstName": "Anne",
                      "id": 1004,
                      "lastName": "Kretchmar",
                      "scores": [],
                      "someBlob": null,
                      "tags": []
                  },
                  "_type": "customer-with-addresses"
              }
          ],
          "max_score": 1.0,
          "total": 1
      },
      "timed_out": false,
      "took": 11
  }

And there you have it: a customer’s complete data, including their addresses, categories, tags etc., materialized into a single document within Elasticsearch. If you’re using JPA to update the customer, you’ll see the data in the index being updated accordingly in near-realtime.

Pros and Cons

So what are the advantages and disadvantages of this approach for materializing aggregates from multiple source tables compared to the KStreams-based approach?

The big advantage is consistency and awareness of transactional boundaries, whereas the KStreams-based solution in its suggested form was prone to exposing intermediary aggregates. For instance, if you’re storing a customer and three addresses, it might happen that the streaming query first creates an aggregation of the customer and the two addresses inserted first, and shortly thereafter the complete aggregate with all three addresses. This not the case for the approach discussed here, as you’ll only ever stream complete aggregates to Kafka. Also this approach feels a bit more "light-weight", i.e. a simple marker annotation (together with some Jackson annotations for fine-tuning the emitted JSON structures) is enough in order to materialize aggregates from your domain model, whereas some more effort was needed to set up the required streams, temporary tables etc. with the KStreams solution.

The downside of driving aggregations through the application layer is that it’s not fully agnostic to the way you access the primary data. If you bypass the application, e.g. by patching data directly in the database, naturally these updates would be missed, requiring a refresh of affected aggregates. Although this again could be done through change data capture and Debezium: change events to source tables could be captured and consumed by the application itself, allowing it to re-materialize aggregates after external data changes. You also might argue that running JSON serializations within source transactions and storing aggregates within the source database represents some overhead. This often may be acceptable, though.

Another question to ask is what’s the advantage of using change data capture on an intermediary aggregate table over simply posting REST requests to Elasticsearch. The answer is the highly increased robustness and fault tolerance. If the Elasticsearch cluster can’t be accessed for some reason, the machinery of Kafka and Kafka Connect will ensure that any change events will be propagated eventually, once the sink is up again. Also other consumers than Elasticsearch can subscribe to the aggregate topic, the log can be replayed from the beginning etc.

Note that while we’ve been talking primarily about using Elasticsearch as a data sink, there are also other datastores and connectors that support complexly structured records. One example would be MongoDB and the sink connector maintained by Hans-Peter Grahsl, which one could use to sink customer aggregates into MongoDB, for instance enabling efficient retrieval of a customer and all their associated data with a single primary key look-up.

Outlook

The Hibernate ORM extension as well as the SMT discussed in this post can be found in our examples repository. They should be considered to be at "proof-of-concept" level currently.

That being said, we’re considering to make this a Debezium component proper, allowing you to employ this aggregation approach within your Hibernate-based applications just by pulling in this new component. For that we’d have to improve a few things first, though. Most importantly, an API is needed which will let you (re-)create aggregates on demand, e.g. for existing data or for data updated by bulk updates via the Criteria API / JPQL (which will be missed by listeners). Also aggregates should be re-created automatically, if any of the referenced entities change (with the current PoC, only a change to the customer instance itself will trigger its aggregate view to be rebuilt, but not a change to one of its addresses).

If you like this idea, then let us know about it, so we can gauge the general interest in this. Also, this would be a great item to work on, if you’re interested in contributing to the Debezium project. Looking forward to hearing from you, e.g. in the comment section below or on our mailing list.

Thanks a lot to Hans-Peter Grahsl for his feedback on an earlier version of this post!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


back to top