Subscribe


Automating Cache Invalidation With Change Data Capture

The second-level cache of Hibernate ORM / JPA is a proven and efficient way to increase application performance: caching read-only or rarely modified entities avoids roundtrips to the database, resulting in improved response times of the application.

Unlike the first-level cache, the second-level cache is associated with the session factory (or entity manager factory in JPA terms), so its contents are shared across transactions and concurrent sessions. Naturally, if a cached entity gets modified, the corresponding cache entry must be updated (or purged from the cache), too. As long as the data changes are done through Hibernate ORM, this is nothing to worry about: the ORM will update the cache automatically.

Things get tricky, though, when bypassing the application, e.g. when modifying records directly in the database. Hibernate ORM then has no way of knowing that the cached data has become stale, and it’s necessary to invalidate the affected items explicitly. A common way for doing so is to foresee some admin functionality that allows to clear an application’s caches. For this to work, it’s vital to not forget about calling that invalidation functionality, or the application will keep working with outdated cached data.

In the following we’re going to explore an alternative approach for cache invalidation, which works in a reliable and fully automated way: by employing Debezium and its change data capture (CDC) capabilities, you can track data changes in the database itself and react to any applied change. This allows to invalidate affected cache entries in near-realtime, without the risk of stale data due to missed changes. If an entry has been evicted from the cache, Hibernate ORM will load the latest version of the entity from the database the next time is requested.

The Example Application

As an example, consider this simple model of two entities, PurchaseOrder and Item:

Example domain model

A purchase order represents the order of an item, where its total price is the ordered quantity times the item’s base price.

Source Code

The source code of this example is provided on GitHub. If you want to follow along and try out all the steps described in the following, clone the repo and follow the instructions in README.md for building the project.

Modelling order and item as JPA entities is straight-forward:

@Entity
public class PurchaseOrder {

    @Id
    @GeneratedValue(generator = "sequence")
    @SequenceGenerator(
        name = "sequence", sequenceName = "seq_po", initialValue = 1001, allocationSize = 50
    )
    private long id;
    private String customer;
    @ManyToOne private Item item;
    private int quantity;
    private BigDecimal totalPrice;

    // ...
}

As changes to items are rare, the Item entity should be cached. This can be done by simply specifying JPA’s @Cacheable annotation:

@Entity
@Cacheable
public class Item {

    @Id
    private long id;
    private String description;
    private BigDecimal price;

    // ...
}

You also need to enable the second-level cache in the META-INF/persistence.xml file. The property hibernate.cache.use_second_level_cache activates the cache itself, and the ENABLE_SELECTIVE cache mode causes only those entities to be put into the cache which are annotated with @Cacheable. It’s also a good idea to enable SQL query logging and cache access statistics. That way you’ll be able to verify whether things work as expected by examining the application log:

<?xml version="1.0" encoding="utf-8"?>
<persistence xmlns="http://xmlns.jcp.org/xml/ns/persistence"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="..."
    version="2.2">

    <persistence-unit name="orders-PU-JTA" transaction-type="JTA">
        <jta-data-source>java:jboss/datasources/OrderDS</jta-data-source>
        <shared-cache-mode>ENABLE_SELECTIVE</shared-cache-mode>
        <properties>
            <property name="hibernate.cache.use_second_level_cache" value="true" />

            <property name="hibernate.show_sql" value="true" />
            <property name="hibernate.format_sql" value="true" />
            <property name="hibernate.generate_statistics" value="true" />

            <!-- dialect etc. ... -->
        </properties>
    </persistence-unit>
</persistence>

When running on a Java EE application server (or Jakarta EE how the stack is called after it has been donated to the Eclipse Foundation), that’s all you need to enable second-level caching. In the case of WildFly (which is what’s used in the example project), the Infinispan key/value store is used as the cache provider by default.

Now try and see what happens when modifying an item’s price by running some SQL in the database, bypassing the application layer. If you’ve checked out the example source code, comment out the DatabaseChangeEventListener class and start the application as described in the README.md. You then can place purchase orders using curl like this (a couple of example items have been persisted at application start-up):

> curl -H "Content-Type: application/json" \
  -X POST \
  --data '{ "customer" : "Billy-Bob", "itemId" : 10003, "quantity" : 2 }' \
  http://localhost:8080/cache-invalidation/rest/orders
{
    "id" : 1002,
    "customer" : "Billy-Bob",
    "item" : {
        "id" :10003,
        "description" : "North By Northwest",
        "price" : 14.99
    },
    "quantity" : 2,
    "totalPrice" : 29.98
}

The response is the expected one, as the item price is 14.99. Now update the item’s price directly in the database. The example uses Postgres, so you can use the psql CLI utility to do so:

docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "UPDATE item SET price = 20.99 where id = 10003"'

Placing another purchase order for the same item using curl, you’ll see that the calculated total price doesn’t reflect the update. Not good! But it’s not too surprising, given that the price update was applied completely bypassing the application layer and Hibernate ORM.

The Change Event Handler

Now let’s explore how to use Debezium and CDC to react to changes in the item table and invalidate corresponding cache entries.

While Debezium most of the times is deployed into Kafka Connect (thus streaming change events into Apache Kafka topics), it has another mode of operation that comes in very handy for the use case at hand. Using the embedded engine, you can run the Debezium connectors as a library directly within your application. For each change event received from the database, a configured callback method will be invoked, which in the case at hand will evict the affected item from the second-level cache.

The following picture shows the design of this approach:

Architecture Overview

While this doesn’t come with the scalability and fault tolerance provided by Apache Kafka, it nicely fits the given requirements. As the second-level cache is bound to the application lifecycle, there is for instance no need for the offset management and restarting capabilities provided by the Kafka Connect framework. For the given use case it is enough to receive data change events while the application is running, and using the embedded engine enables exactly that.

Clustered Applications

Note that it still might make sense to use Apache Kafka and the regular deployment of Debezium into Kafka Connect when running a clustered application where each node has a local cache. Instead of registering a connector on each node, Kafka and Connect would allow you to deploy a single connector instance and have the application nodes listen to the topic(s) with the change events. This would result in less resource utilization in the database.

Having added the dependencies of the Debezium embedded engine (io.debezium:debezium-embedded:0.9.0.Beta1) and the Debezium Postgres connector (io.debezium:debezium-connector-postgres:0.9.0.Beta1) to your project, a class DatabaseChangeEventListener for listening to any changes in the database can be implemented like this:

@ApplicationScoped
public class DatabaseChangeEventListener {

    @Resource
    private ManagedExecutorService executorService;

    @PersistenceUnit private EntityManagerFactory emf;

    @PersistenceContext
    private EntityManager em;

    private EmbeddedEngine engine;

    public void startEmbeddedEngine(@Observes @Initialized(ApplicationScoped.class) Object init) {
        Configuration config = Configuration.empty()
                .withSystemProperties(Function.identity()).edit()
                .with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class)
                .with(EmbeddedEngine.ENGINE_NAME, "cache-invalidation-engine")
                .with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class)
                .with("name", "cache-invalidation-connector")
                .with("database.hostname", "postgres")
                .with("database.port", 5432)
                .with("database.user", "postgresuser")
                .with("database.password", "postgrespw")
                .with("database.server.name", "dbserver1")
                .with("database.dbname", "inventory")
                .with("database.whitelist", "public")
                .with("snapshot.mode", "never")
                .build();

        this.engine = EmbeddedEngine.create()
                .using(config)
                .notifying(this::handleDbChangeEvent)
                .build();

        executorService.execute(engine);
    }

    @PreDestroy
    public void shutdownEngine() {
        engine.stop();
    }

    private void handleDbChangeEvent(SourceRecord record) {
        if (record.topic().equals("dbserver1.public.item")) {
            Long itemId = ((Struct) record.key()).getInt64("id");
            Struct payload = (Struct) record.value();
            Operation op = Operation.forCode(payload.getString("op"));

            if (op == Operation.UPDATE || op == Operation.DELETE) {
                emf.getCache().evict(Item.class, itemId);
            }
        }
    }
}

Upon application start-up, this configures an instance of the Debezium Postgres connector and sets up the embedded engine for running the connector. The connector options (host name, credentials etc.) are mostly the same as when deploying the connector into Kafka Connect. There is no need for doing an initial snapshot of the existing data, hence the snapshot mode is set to "never".

The offset storage option is used for controlling how connector offsets should be persisted. As it’s not necessary to process any change events occurring while the connector is not running (instead you’d just begin to read the log from the current location after the restart), the in-memory implementation provided by Kafka Connect is used.

Once configured, the embedded engine must be run via an Executor instance. As the example runs in WildFly, a managed executor can simply be obtained through @Resource injection for that purpose (see JSR 236).

The embedded engine is configured to invoke the handleDbChangeEvent() method for each received data change event. In this method it first is checked whether the incoming event originates from the item table. If that’s the case, and if the change event represents an UPDATE or DELETE statement, the affected Item instance is evicted from the second-level cache. JPA 2.0 provides a simple API for this purpose which is accessible via the EntityManagerFactory.

With the DatabaseChangeEventListener class in place, the cache entry will now automatically be evicted when doing another item update via psql. When placing the first purchase order for that item after the update, you’ll see in the application log how Hibernate ORM executes a query SELECT ... FROM item ... in order to load the item referenced by the order. Also the cache statistics will report one "L2C miss". Upon subsequent orders of that same item it will be obtained from the cache again.

Eventual Consistency

While the event handling happens in near-realtime, it’s important to point out that it still applies eventual consistency semantics. This means that there is a very short time window between the point in time where a transaction is committed and the point in time where the change event is streamed from the log to the event handler and the cache entry is invalidated.

Avoiding Cache Invalidations After Application-triggered Data Changes

The change event listener shown above satisfies the requirement of invalidating cached items after external data changes. But in its current form it is evicting cache items a bit too aggressively: cached items will also be purged when updating an Item instance through the application itself. This is not only not needed (as the cached item already is the current version), but it’s even counter-productive: the superfluous cache evictions will cause additional database roundtrips, resulting in longer response times.

It is therefore necessary to distinguish between data changes performed by the application itself and external data changes. Only in the latter case the affected items should be evicted from the cache. In order to do so, you can leverage the fact that each Debezium data change event contains the id of the originating transaction. Keeping track of all transactions run by the application itself allows to trigger the cache eviction only for those items altered by external transactions.

Accounting for this change, the overall architecture looks like so:

Architecture Overview with Transaction Registry

The first thing to implement is the transaction registry, i.e. a class for the transaction book keeping:

@ApplicationScoped
public class KnownTransactions {

    private final DefaultCacheManager cacheManager;
    private final Cache<Long, Boolean> applicationTransactions;

    public KnownTransactions() {
        cacheManager = new DefaultCacheManager();
        cacheManager.defineConfiguration(
                "tx-id-cache",
                new ConfigurationBuilder()
                    .expiration()
                        .lifespan(60, TimeUnit.SECONDS)
                    .build()
                );

        applicationTransactions = cacheManager.getCache("tx-id-cache");
    }

    @PreDestroy
    public void stopCacheManager() {
        cacheManager.stop();
    }

    public void register(long txId) {
        applicationTransactions.put(txId, true);
    }

    public boolean isKnown(long txId) {
        return Boolean.TRUE.equals(applicationTransactions.get(txId));
    }
}

This uses the Infinispan DefaultCacheManager for creating and maintaining an in-memory cache of transaction ids encountered by the application. As data change events arrive in near-realtime, the TTL of the cache entries can be rather short (in fact, the value of one minute shown in the example is chosen very conservatively, usually events should be received within seconds).

The next step is to retrieve the current transaction id whenever a request is processed by the application and register it within KnownTransactions. This should happen once per transaction. There are multiple ways for implementing this logic; in the following a Hibernate ORM FlushEventListener is used for this purpose:

class TransactionRegistrationListener implements FlushEventListener {

    private volatile KnownTransactions knownTransactions;

    public TransactionRegistrationListener() {
    }

    @Override
    public void onFlush(FlushEvent event) throws HibernateException {
        event.getSession().getActionQueue().registerProcess( session -> {
            Number txId = (Number) event.getSession().createNativeQuery("SELECT txid_current()")
                    .setFlushMode(FlushMode.MANUAL)
                    .getSingleResult();

            getKnownTransactions().register(txId.longValue());
        } );
    }

    private  KnownTransactions getKnownTransactions() {
        KnownTransactions value = knownTransactions;

        if (value == null) {
            knownTransactions = value = CDI.current().select(KnownTransactions.class).get();
        }

        return value;
    }
}

As there’s no portable way to obtain the transaction id, this is done using a native SQL query. In the case of Postgres, the txid_current() function can be called for that. Hibernate ORM event listeners are not subject to dependency injection via CDI. Hence the static current() method is used to obtain a handle to the application’s CDI container and get a reference to the KnownTransactions bean.

This listener will be invoked whenever Hibernate ORM is synchronizing its persistence context with the database ("flushing"), which usually happens exactly once when the transaction is committed.

Manual Flushes

The session / entity manager can also be flushed manually, in which case the txid_current() function would be invoked multiple times. That’s neglected here for the sake of simplicity. The actual code in the example repo contains a slightly extended version of this class which makes sure that the transaction id is obtained only once.

To register the flush listener with Hibernate ORM, an Integrator implementation must be created and declared in the META-INF/services/org.hibernate.integrator.spi.Integrator file:

public class TransactionRegistrationIntegrator implements Integrator {

    @Override
    public void integrate(Metadata metadata, SessionFactoryImplementor sessionFactory,
            SessionFactoryServiceRegistry serviceRegistry) {
        serviceRegistry.getService(EventListenerRegistry.class)
            .appendListeners(EventType.FLUSH, new TransactionRegistrationListener());
    }

    @Override
    public void disintegrate(SessionFactoryImplementor sessionFactory,
            SessionFactoryServiceRegistry serviceRegistry) {
    }
}
io.debezium.examples.cacheinvalidation.persistence.TransactionRegistrationIntegrator

During bootstrap, Hibernate ORM will detect the integrator class (by means of the Java service loader), invoke its integrate() method which in turn will register the listener class for the FLUSH event.

The last step is to exclude any events stemming from transactions run by the application itself in the database change event handler:

@ApplicationScoped
public class DatabaseChangeEventListener {

    // ...

    @Inject
    private KnownTransactions knownTransactions;

    private void handleDbChangeEvent(SourceRecord record) {
        if (record.topic().equals("dbserver1.public.item")) {
            Long itemId = ((Struct) record.key()).getInt64("id");
            Struct payload = (Struct) record.value();
            Operation op = Operation.forCode(payload.getString("op"));
            Long txId = ((Struct) payload.get("source")).getInt64("txId");

            if (!knownTransactions.isKnown(txId) &&
                    (op == Operation.UPDATE || op == Operation.DELETE)) {
                emf.getCache().evict(Item.class, itemId);
            }
        }
    }
}

And with that, you got all the pieces in place: cached Items will only be evicted after external data changes, but not after changes done by the application itself. To confirm, you can invoke the example’s items resource using curl:

> curl -H "Content-Type: application/json" \
  -X PUT \
  --data '{ "description" : "North by Northwest", "price" : 20.99}' \
  http://localhost:8080/cache-invalidation/rest/items/10003

When placing the next order for the item after this update, you should see that the Item entity is obtained from the cache, i.e. the change event will not have caused the item’s cache entry to be evicted. In contrast, if you update the item’s price via psql another time, the item should be removed from the cache and the order request will produce a cache miss, followed by a SELECT against the item table in the database.

Summary

In this blog post we’ve explored how Debezium and change data capture can be employed to invalidate application-level caches after external data changes. Compared to manual cache invalidation, this approach works very reliably (by capturing changes directly from the database log, no events will be missed) and fast (cache eviction happens in near-realtime after the data changes).

As you have seen, not too much glue code is needed in order to implement this. While the shown implementation is somewhat specific to the entities of the example, it should be possible to implement the change event handler in a more generic fashion, so that it can handle a set of configured entity types (essentially, the database change listener would have to convert the primary key field(s) from the change events into the primary key type of the corresponding entities in a generic way). Also such generic implementation would have to provide the logic for obtaining the current transaction id for the most commonly used databases.

Please let us know whether you think this would be an interesting extension to have for Debezium and Hibernate ORM. For instance this could be a new module under the Debezium umbrella, and it could also be a very great project to work on, should you be interested in contributing to Debezium. If you got any thoughts on this idea, please post a comment below or come to our mailing list.

Many thanks to Guillaume Smet, Hans-Peter Grahsl and Jiri Pechanec for their feedback while writing this post!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Materializing Aggregate Views With Hibernate and Debezium

Updating external full text search indexes (e.g. Elasticsearch) after data changes is a very popular use case for change data capture (CDC).

As we’ve discussed in a blog post a while ago, the combination of Debezium’s CDC source connectors and Confluent’s sink connector for Elasticsearch makes it straight forward to capture data changes in MySQL, Postgres etc. and push them towards Elastisearch in near real-time. This results in a 1:1 relationship between tables in the source database and a corresponding search index in Elasticsearch, which is perfectly fine for many use cases.

It gets more challenging though if you’d like to put entire aggregates into a single index. An example could be a customer and all their addresses; those would typically be stored in two separate tables in an RDBMS, linked by a foreign key, whereas you’d like to have just one index in Elasticsearch, containing documents of customers with their addresses embedded, allowing you to efficiently search for customers based on their address.

Following up to the KStreams-based solution to this we described recently, we’d like to present in this post an alternative for materializing such aggregate views driven by the application layer.

Overview

The idea is to materialize views in a separate table in the source database, right in the moment the original data is altered.

Aggregates are serialized as JSON structures (which naturally can represent any nested object structure) and stored in a specific table. This is done within the actual transaction altering the data, which means the aggregate view is always consistent with the primary data. In particular this approach isn’t prone to exposing intermediary aggregations as the KStreams-based solution discussed in the post linked above.

The following picture shows the overall architecture:

Streaming Materialized Aggregate Views to Elastisearch

Here the aggregate views are materialized by means of a small extension to Hibernate ORM, which stores the JSON aggregates within the source database (note "aggregate views" can be considered conceptually the same as "materialized views" as known from different RDBMS, as in that they materialize the result of a "join" operation, but technically we’re not using the latter to store aggregate views, but a regular table). Changes to that aggregate table are then captured by Debezium and streamed to one topic per aggregate type. The Elasticsearch sink connector can subscribe to these topics and update corresponding full-text indexes.

You can find a proof-of-concept implementation (said Hibernate extension and related code) of this idea in our examples repository. Of course the general idea isn’t limited to Hibernate ORM or JPA, you could implement something similar with any other API you’re using to access your data.

Creating Aggregate Views via Hibernate ORM

For the following let’s assume we’re persisting a simple domain model (comprising a Customer entity and a few related ones like Address, (customer) Category etc.) in a database. Using Hibernate for that allows us to make the creation of aggregates fully transparent to the actual application code using a Hibernate event listener. Thanks to its extensible architecture, we can plug such listener into Hibernate just by adding it to the classpath, from where it will be picked up automatically when bootstrapping the entity manager / session factory.

Our example listener reacts to an annotation, @MaterializeAggregate, which marks those entity types that should be the roots of materialized aggregates.

@Entity
@MaterializeAggregate(aggregateName="customers-complete")
public class Customer {

    @Id
    private long id;

    private String firstName;

    @OneToMany(mappedBy = "customer", fetch = FetchType.EAGER, cascade = CascadeType.ALL)
    private Set<Address> addresses;

    @ManyToOne
    private Category category;

    ...
}

Now if any entity annotated with @MaterializeAggregate is inserted, updated or deleted via Hibernate, the listener will kick in and materialize a JSON view of the aggregate root (customer) and its associated entities (addresses, category).

Under the hood the Jackson API is used for serializing the model into JSON. This means you can use any of its annotations to customize the JSON output, e.g. @JsonIgnore to exclude the inverse relationship from Address to Customer:

@Entity
public class Address {

    @Id
    private long id;

    @ManyToOne
    @JoinColumn(name = "customer_id")
    @JsonIgnore
    private Customer customer;

    private String street;

    private String city;

    ...
}

Note that Address itself isn’t marked with @MaterializeAggregate, i.e. it won’t be materialized into an aggregate view by itself.

After using JPA’s EntityManager to insert or update a few customers, let’s take a look at the aggregates table which has been populated by the listener (value schema omitted for the sake of brevity):

> select * from aggregates;

| rootType | keySchema | rootId | materialization | valueSchema |

| customers-complete

| {
  "schema" : {
    "type" : "struct",
    "fields" : [ {
      "type" : "int64",
      "optional" : false,
      "field" : "id"
    } ],
    "optional" : false,
    "name" : "customers-complete.Key"
  }
}

| { "id" : 1004 }

| { "schema" : { ... } }

| {
  "id" : 1004,
  "firstName" : "Anne",
  "lastName" : "Kretchmar",
  "email" : "annek@noanswer.org",
  "tags" : [ "long-term", "vip" ],
  "birthday" : 5098,
  "category" : {
    "id" : 100001,
    "name" : "Retail"
  },
  "addresses" : [ {
    "id" : 16,
    "street" : "1289 University Hill Road",
    "city" : "Canehill",
    "state" : "Arkansas",
    "zip" : "72717",
    "type" : "SHIPPING"
  } ]
} |

The table contains these columns:

  • rootType: The name of the aggregate as given in the @MaterializeAggregate annotation

  • rootId: The aggregate’s id as serialized JSON

  • materialization: The aggregate itself as serialized JSON; in this case a customer and their addresses, category etc.

  • keySchema: The Kafka Connect schema of the row’s key

  • valueSchema: The Kafka Connect schema of the materialization

Let’s talk about the two schema columns for a bit. JSON itself is quite limited as far as its supported data types are concerned. So for instance we’d loose information about a numeric field’s value range (int vs. long etc.) without any additional information. Therefore the listener derives the corresponding schema information for key and aggregate view from the entity model and stores it within the aggregate records.

Now Jackson itself only supports JSON Schema, which would be a bit too limited for our purposes. Hence the example implementation provides custom serializers for Jackson’s schema system, which allow us to emit Kafka Connect’s schema representation (with more precise type information) instead of plain JSON Schema. This will come in handy in the following when we’d like to expand the string-based JSON representations of key and value into properly typed Kafka Connect records.

Capturing Changes to the Aggregate Table

We now have a mechanism in place which transparently persists aggregates into a separate table within the source database, whenever the application data is changed through Hibernate. Note that this happens within the boundaries of the source transaction, so if the same would be rolled back for some reason, also the aggregate view would not be updated.

The Hibernate listener uses insert-or-update semantics when writing an aggregate view, i.e. for a given aggregate root there’ll always be exactly one corresponding entry in the aggregate table which reflects its current state. If an aggregate root entity is deleted, the listener will also drop the entry from the aggregate table.

So let’s set up Debezium now to capture any changes to the aggregates table:

curl -i -X POST \
  -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  http://localhost:8083/connectors/ -d @- <<-EOF
  {
      "name": "inventory-connector",
      "config": {
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "tasks.max": "1",
          "database.hostname": "mysql",
          "database.port": "3306",
          "database.user": "debezium",
          "database.password": "dbz",
          "database.server.id": "184054",
          "database.server.name": "dbserver1",
          "database.whitelist": "inventory",
          "table.whitelist": ".*aggregates",
          "database.history.kafka.bootstrap.servers": "kafka:9092",
          "database.history.kafka.topic": "schema-changes.inventory"
      }
  }
EOF

This registers the MySQL connector with the "inventory" database (we’re using an expanded version of the schema from the Debezium tutorial), capturing any changes to the "aggregates" table.

Expanding JSON

If we now were to browse the corresponding Kafka topic, we’d see data change events in the known Debezium format for all the changes to the aggregates table.

The "materialization" field with the records' "after" state still is a single field containing a JSON string, though. What we’d rather like to have is a strongly typed Kafka Connect record, whose schema exactly describes the aggregate structure and the types of its fields. For that purpose the example project provides an SMT (single message transform) which takes the JSON materialization and the corresponding valueSchema and converts this into a full-blown Kafka Connect record. The same is done for keys. DELETE events are rewritten into tombstone events. Finally, the SMT re-routes every record to a topic named after the aggregate root, allowing consumers to subscribe just to changes to specific aggregate types.

So let’s add that SMT when registering the Debezium CDC connector:

...
"transforms":"expandjson",
"transforms.expandjson.type":"io.debezium.aggregation.smt.ExpandJsonSmt",
...

When now browsing the "customers-complete" topic, we’ll see the strongly typed Kafka Connect records we’d expect:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int64",
                "optional": false,
                "field": "id"
            }
        ],
        "optional": false,
        "name": "customers-complete.Key"
    },
    "payload": {
        "id": 1004
    }
}
{
    "schema": {
        "type": "struct",
        "fields": [ ... ],
        "optional": true,
        "name": "urn:jsonschema:com:example:domain:Customer"
    },
    "payload": {
        "id": 1004,
        "firstName": "Anne",
        "lastName": "Kretchmar",
        "email": "annek@noanswer.org",
        "active": true,
        "tags" : [ "long-term", "vip" ],
        "birthday" : 5098,
        "category": {
            "id": 100001,
            "name": "Retail"
        },
        "addresses": [
            {
                "id": 16,
                "street": "1289 University Hill Road",
                "city": "Canehill",
                "state": "Arkansas",
                "zip": "72717",
                "type": "LIVING"
            }
        ]
    }
}

To confirm that these are actual typed Kafka Connect records and not just a single JSON string field, you could for instance use the Avro message converter and examine the message schemas in the schema registry.

Sinking Aggregate Messages Into Elasticsearch

The last missing step is to register the Confluent Elasticsearch sink connector, hooking it up with the "customers-complete" topic and letting it push any changes to the corresponding index:

curl -i -X POST \
  -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  http://localhost:8083/connectors/ -d @- <<-EOF
  {
      "name": "es-customers",
      "config": {
          "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
          "tasks.max": "1",
          "topics": "customers-complete",
          "connection.url": "http://elastic:9200",
          "key.ignore": "false",
          "schema.ignore" : "false",
          "behavior.on.null.values" : "delete",
          "type.name": "customer-with-addresses",
          "transforms" : "key",
          "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
          "transforms.key.field": "id"
      }
  }
EOF

This uses Connect’s ExtractField transformation to obtain just the actual id value from the key struct and use it as key for the corresponding Elasticsearch documents. Specifying the "behavior.on.null.values" option will let the connector delete the corresponding document from the index when encountering a tombstone message (i.e. a message with a key but without value).

Finally, we can use the Elasticsearch REST API to browse the index and of course use its powerful full-text query language to find customers by the address or any other property embedded into the aggregate structure:

> curl -X GET -H "Accept:application/json" \
  http://localhost:9200/customers-complete/_search?pretty

  {
      "_shards": {
          "failed": 0,
          "successful": 5,
          "total": 5
      },
      "hits": {
          "hits": [
              {
                  "_id": "1004",
                  "_index": "customers-complete",
                  "_score": 1.0,
                  "_source": {
                      "active": true,
                      "addresses": [
                          {
                              "city": "Canehill",
                              "id": 16,
                              "state": "Arkansas",
                              "street": "1289 University Hill Road",
                              "type": "LIVING",
                              "zip": "72717"
                          }
                      ],
                      "tags" : [ "long-term", "vip" ],
                      "birthday" : 5098,
                      "category": {
                          "id": 100001,
                          "name": "Retail"
                      },
                      "email": "annek@noanswer.org",
                      "firstName": "Anne",
                      "id": 1004,
                      "lastName": "Kretchmar",
                      "scores": [],
                      "someBlob": null,
                      "tags": []
                  },
                  "_type": "customer-with-addresses"
              }
          ],
          "max_score": 1.0,
          "total": 1
      },
      "timed_out": false,
      "took": 11
  }

And there you have it: a customer’s complete data, including their addresses, categories, tags etc., materialized into a single document within Elasticsearch. If you’re using JPA to update the customer, you’ll see the data in the index being updated accordingly in near-realtime.

Pros and Cons

So what are the advantages and disadvantages of this approach for materializing aggregates from multiple source tables compared to the KStreams-based approach?

The big advantage is consistency and awareness of transactional boundaries, whereas the KStreams-based solution in its suggested form was prone to exposing intermediary aggregates. For instance, if you’re storing a customer and three addresses, it might happen that the streaming query first creates an aggregation of the customer and the two addresses inserted first, and shortly thereafter the complete aggregate with all three addresses. This not the case for the approach discussed here, as you’ll only ever stream complete aggregates to Kafka. Also this approach feels a bit more "light-weight", i.e. a simple marker annotation (together with some Jackson annotations for fine-tuning the emitted JSON structures) is enough in order to materialize aggregates from your domain model, whereas some more effort was needed to set up the required streams, temporary tables etc. with the KStreams solution.

The downside of driving aggregations through the application layer is that it’s not fully agnostic to the way you access the primary data. If you bypass the application, e.g. by patching data directly in the database, naturally these updates would be missed, requiring a refresh of affected aggregates. Although this again could be done through change data capture and Debezium: change events to source tables could be captured and consumed by the application itself, allowing it to re-materialize aggregates after external data changes. You also might argue that running JSON serializations within source transactions and storing aggregates within the source database represents some overhead. This often may be acceptable, though.

Another question to ask is what’s the advantage of using change data capture on an intermediary aggregate table over simply posting REST requests to Elasticsearch. The answer is the highly increased robustness and fault tolerance. If the Elasticsearch cluster can’t be accessed for some reason, the machinery of Kafka and Kafka Connect will ensure that any change events will be propagated eventually, once the sink is up again. Also other consumers than Elasticsearch can subscribe to the aggregate topic, the log can be replayed from the beginning etc.

Note that while we’ve been talking primarily about using Elasticsearch as a data sink, there are also other datastores and connectors that support complexly structured records. One example would be MongoDB and the sink connector maintained by Hans-Peter Grahsl, which one could use to sink customer aggregates into MongoDB, for instance enabling efficient retrieval of a customer and all their associated data with a single primary key look-up.

Outlook

The Hibernate ORM extension as well as the SMT discussed in this post can be found in our examples repository. They should be considered to be at "proof-of-concept" level currently.

That being said, we’re considering to make this a Debezium component proper, allowing you to employ this aggregation approach within your Hibernate-based applications just by pulling in this new component. For that we’d have to improve a few things first, though. Most importantly, an API is needed which will let you (re-)create aggregates on demand, e.g. for existing data or for data updated by bulk updates via the Criteria API / JPQL (which will be missed by listeners). Also aggregates should be re-created automatically, if any of the referenced entities change (with the current PoC, only a change to the customer instance itself will trigger its aggregate view to be rebuilt, but not a change to one of its addresses).

If you like this idea, then let us know about it, so we can gauge the general interest in this. Also, this would be a great item to work on, if you’re interested in contributing to the Debezium project. Looking forward to hearing from you, e.g. in the comment section below or on our mailing list.

Thanks a lot to Hans-Peter Grahsl for his feedback on an earlier version of this post!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Streaming MySQL Data Changes to Amazon Kinesis

Most of the times Debezium is used to stream data changes into Apache Kafka. What though if you’re using another streaming platform such as Apache Pulsar or a cloud-based solution such as Amazon Kinesis, Azure Event Hubs and the like? Can you still benefit from Debezium’s powerful change data capture (CDC) capabilities and ingest changes from databases such as MySQL, Postgres, SQL Server etc.?

Turns out, with just a bit of glue code, you can! In the following we’ll discuss how to use Debezium to capture changes in a MySQL database and stream the change events into Kinesis, a fully-managed data streaming service available on the Amazon cloud.

Introducing the Debezium Embedded Engine

Debezium is implemented as a set of connectors for Kafka and thus usually is run via Kafka Connect. But there’s one little gem in Debezium which isn’t as widely known yet, which is the embedded engine.

When using this engine, the Debezium connectors are not executed within Kafka Connect, but as a library embedded into your own Java application. For this purpose, the debezium-embedded module provides a small runtime environment which performs the tasks that’d otherwise be handled by the Kafka Connect framework: requesting change records from the connector, committing offsets etc. Each change record produced by the connector is passed to a configured event handler method, which in our case will convert the record into its JSON representation and submit it to a Kinesis stream, using the Kinesis Java API.

The overall architecture looks like so:

Debezium Embedded Engine Streaming to Amazon Kinesis

Now let’s walk through the relevant parts of the code required for that. A complete executable example can be found in the debezium-examples repo on GitHub.

Set-Up

In order to use Debezium’s embedded engine, add the debezium-embedded dependency as well as the Debezium connector of your choice to your project’s pom.xml. In the following we’re going to use the connector for MySQL. We also need to add a dependency to the Kinesis Client API, so these are the dependencies needed:

...
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>0.8.3.Final</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>0.8.3.Final</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.9.0</version>
</dependency>
...

Configuring the Embedded Engine

The Debezium embedded engine is configured through an instance of io.debezium.config.Configuration. This class can obtain values from system properties or from a given config file, but for the sake of the example we’ll simply pass all required values via its fluent builder API:

Configuration config = Configuration.create()
    .with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.mysql.MySqlConnector")
    .with(EmbeddedEngine.ENGINE_NAME, "kinesis")
    .with(MySqlConnectorConfig.SERVER_NAME, "kinesis")
    .with(MySqlConnectorConfig.SERVER_ID, 8192)
    .with(MySqlConnectorConfig.HOSTNAME, "localhost")
    .with(MySqlConnectorConfig.PORT, 3306)
    .with(MySqlConnectorConfig.USER, "debezium")
    .with(MySqlConnectorConfig.PASSWORD, "dbz")
    .with(MySqlConnectorConfig.DATABASE_WHITELIST, "inventory")
    .with(MySqlConnectorConfig.TABLE_WHITELIST, "inventory.customers")
    .with(EmbeddedEngine.OFFSET_STORAGE,
        "org.apache.kafka.connect.storage.MemoryOffsetBackingStore")
    .with(MySqlConnectorConfig.DATABASE_HISTORY,
        MemoryDatabaseHistory.class.getName())
    .with("schemas.enable", false)
    .build();

If you’ve ever set up the Debezium MySQL connector in Kafka Connect, most of the properties will look familiar to you.

But let’s talk about the OFFSET_STORAGE and DATABASE_HISTORY options in a bit more detail. They deal with how connector offsets and the database history should be persisted. When running the connector via Kafka Connect, both would typically be stored in specific Kafka topics. But that’s not an option here, so an alternative is needed. For this example we’re simply going to keep the offsets and database history in memory. I.e. if the engine is restarted, this information will be lost and the connector will start from scratch, e.g. with a new initial snapshot.

While out of scope for this blog post, it wouldn’t be too difficult to create alternative implementations of the OffsetBackingStore and DatabaseHistory contracts, respectively. For instance if you’re fully committed into the AWS cloud services, you could think of storing offsets and database history in the DynamoDB NoSQL store. Note that, different from Kafka, a Kinesis stream wouldn’t be suitable for storing the database history. The reason being, that the maximum retention period for Kinesis data streams is seven days, whereas the database history must be kept for the entire lifetime of the connector. Another alternative could be to use the existing filesystem based implementations FileOffsetBackingStore and FileDatabaseHistory, respectively.

The next step is to build an EmbeddedEngine instance from the configuration. Again this is done using a fluent API:

EmbeddedEngine engine = EmbeddedEngine.create()
    .using(config)
    .using(this.getClass().getClassLoader())
    .using(Clock.SYSTEM)
    .notifying(this::sendRecord)
    .build();

The most interesting part here is the notifying call. The method passed here is the one which will be invoked by the engine for each emitted data change record. So let’s take a look at the implementation of this method.

Sending Change Records to Kinesis

The sendRecord() method is where the magic happens. We’ll convert the incoming SourceRecord into an equivalent JSON representation and propagate it to a Kinesis stream.

For that, it’s important to understand some conceptual differences between Apache Kafka and Kinesis. Specifically, messages in Kafka have a key and a value (which both are arbitrary byte arrays). In case of Debezium, the key of data change events represents the primary key of the affected record and the value is a structure comprising of old and new row state as well as some additional metadata.

In Kinesis on the other hand a message contains a data blob (again an arbitrary byte sequence) and a partition key. Kinesis streams can be split up into multiple shards and the partition key is used to determine into which shard a given message should go.

Now one could think of mapping the key from Debezium’s change data events to the Kinesis partition key, but partition keys are limited to a length of 256 bytes. Depending on the length of primary key column(s) in the captured tables, this might not be enough. So a safer option is to create a hash value from the change message key and use that as the partition key. This in turn means that the change message key structure should be added next to the actual value to the Kinesis message’s data blob. While the key column values themselves are part of the value structure, too, a consumer otherwise wouldn’t know which column(s) make up the primary key.

With that in mind, let’s take a look at the sendRecord() implementation:

private void sendRecord(SourceRecord record) {
    // We are interested only in data events not schema change events
    if (record.topic().equals("kinesis")) {
        return;
    }

    // create schema for container with key *and* value
    Schema schema = SchemaBuilder.struct()
        .field("key", record.keySchema())
        .field("value", record.valueSchema())
        .build();

    Struct message = new Struct(schema);
    message.put("key", record.key());
    message.put("value", record.value());

    // create partition key by hashing the record's key
    String partitionKey = String.valueOf(
        record.key() != null ? record.key().hashCode() : -1);

    // create data blob representing the container by using Kafka Connect's
    // JSON converter
    final byte[] payload = valueConverter.fromConnectData(
        "dummy", schema, message);

    // Assemble the put-record request ...
    PutRecordRequest putRecord = new PutRecordRequest();

    putRecord.setStreamName(record.topic());
    putRecord.setPartitionKey(partitionKey);
    putRecord.setData(ByteBuffer.wrap(payload));

    // ... and execute it
    kinesisClient.putRecord(putRecord);
}

The code is quite straight-forward; as discussed above it’s first creating a container structure containing key and value of the incoming source record. This structure then is converted into a binary representation using the JSON converter provided by Kafka Connect (an instance of JsonConverter). Then a PutRecordRequest is assembled from that blob, the partition key and the change record’s topic name, which finally is sent to Kinesis.

The Kinesis client object can be re-used and is set up once like so:

// Uses the credentials from the local "default" AWS profile
AWSCredentialsProvider credentialsProvider =
    new ProfileCredentialsProvider("default");

this.kinesisClient = AmazonKinesisClientBuilder.standard()
    .withCredentials(credentialsProvider)
    .withRegion("eu-central-1") // use your AWS region here
    .build();

With that, we’ve set up an instance of Debezium’s EmbeddedEngine which runs the configured MySQL connector and passes each emitted change event to Amazon Kinesis. The last missing step is to actually run the engine. This is done on a separate thread using an Executor, e.g. like so:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);

Note you also should make sure to properly shut down the engine eventually. How that can be done is shown in the accompanying example in the debezium-examples repo.

Running the Example

Finally let’s take a look at running the complete example and consuming the Debezium CDC events from the Kinesis stream. Start by cloning the examples repository and go to the kinesis directory:

git clone https://github.com/debezium/debezium-examples.git
cd debezium-examples/kinesis

Make sure you’ve met the prerequisites described in the example’s README.md; most notably you should have a local Docker installation and you’ll need to have set up an AWS account as well as have the AWS client tools installed. Note that Kinesis isn’t part of the free tier when registering with AWS, i.e. you’ll pay a (small) amount of money when executing the example. Don’t forget to delete the streams you’ve set up once done, we won’t pay your AWS bills :)

Now run Debezium’s MySQL example database to have some data to play with:

docker run -it --rm --name mysql -p 3306:3306 \
  -e MYSQL_ROOT_PASSWORD=debezium \
  -e MYSQL_USER=mysqluser \
  -e MYSQL_PASSWORD=mysqlpw \
  debezium/example-mysql:0.8

Create a Kinesis stream for change events from the customers table:

aws kinesis create-stream --stream-name kinesis.inventory.customers \
  --shard-count 1

Execute the Java application that runs the Debezium embedded engine (if needed, adjust the value of the kinesis.region property in pom.xml to your own region first):

mvn exec:java

This will start up the engine and the MySQL connector, which takes an initial snapshot of the captured database.

In order to take a look at the CDC events in the Kinesis stream, the AWS CLI can be used (usually, you’d implement a Kinesis Streams application for consuming the events). To do so, set up a shard iterator first:

ITERATOR=$(aws kinesis get-shard-iterator --stream-name kinesis.inventory.customers --shard-id 0 --shard-iterator-type TRIM_HORIZON | jq '.ShardIterator')

Note how the jq utility is used to obtain the generated id of the iterator from the JSON structure returned by the Kinesis API. Next that iterator can be used to examine the stream:

aws kinesis get-records --shard-iterator $ITERATOR

You should receive an array of records like this:

{
    "Records": [
        {
            "SequenceNumber":
                "49587760482547027816046765529422807492446419903410339842",
            "ApproximateArrivalTimestamp": 1535551896.475,
            "Data": "eyJiZWZvcm...4OTI3MzN9",
            "PartitionKey": "eyJpZCI6MTAwMX0="
        },
        ...
    ]
}

The Data element is a Base64-encoded representation of the message’s data blob. Again jq comes in handy: we can use it to just extract the Data part of each record and decode the Base64 representation (make sure to use jq 1.6 or newer):

aws kinesis get-records --shard-iterator $ITERATOR | \
  jq -r '.Records[].Data | @base64d' | jq .

Now you should see the change events as JSON, each one with key and value:

{
  "key": {
    "id": 1001
  },
  "value": {
    "before": null,
    "after": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "0.8.1.Final",
      "name": "kinesis",
      "server_id": 0,
      "ts_sec": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": "inventory",
      "table": "customers",
      "query": null
    },
    "op": "c",
    "ts_ms": 1535555325628
  }
}
...

Next let’s try and update a record in MySQL:

# Start MySQL CLI client
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 \
  sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" \
  -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

# In the MySQL client
use inventory;
update customers set first_name = 'Trudy' where id = 1001;

If you now fetch the iterator again, you should see one more data change event representing that update:

...

{
  "key": {
    "id": 1001
  },
  "value": {
    "before": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "after": {
      "id": 1001,
      "first_name": "Trudy",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "0.8.1.Final",
      "name": "kinesis",
      "server_id": 223344,
      "ts_sec": 1535627629,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 364,
      "row": 0,
      "snapshot": false,
      "thread": 10,
      "db": "inventory",
      "table": "customers",
      "query": null
    },
    "op": "u",
    "ts_ms": 1535627622546
  }
}

Once you’re done, stop the embedded engine application by hitting Ctrl + C, stop the MySQL server by running docker stop mysql and delete the kinesis.inventory.customers stream in Kinesis.

Summary and Outlook

In this blog post we’ve demonstrated that Debezium cannot only be used to stream data changes into Apache Kafka, but also into other streaming platforms such as Amazon Kinesis. Leveraging its embedded engine and by implementing a bit of glue code, you can benefit from all the CDC connectors provided by Debezium and their capabilities and connect them to the streaming solution of your choice.

And we’re thinking about even further simplifying this usage of Debezium. Instead of requiring you to implement your own application that invokes the embedded engine API, we’re considering to provide a small self-contained Debezium runtime which you can simply execute. It’d be configured with the source connector to run and make use of an outbound plug-in SPI with ready-to-use implementations for Kinesis, Apache Pulsar and others. Of course such runtime would also provide suitable implementations for safely persisting offsets and database history, and it’d offer means of monitoring, health checks etc. Meaning you could connect the Debezium source connectors with your preferred streaming platform in a robust and reliable way, without any manual coding required!

If you like this idea, then please check out JIRA issue DBZ-651 and let us know about your thoughts, e.g. by leaving a comment on the issue, in the comment section below or on our mailing list.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Creating DDD aggregates with Debezium and Kafka Streams

Microservice-based architectures can be considered an industry trend and are thus often found in enterprise applications lately. One possible way to keep data synchronized across multiple services and their backing data stores is to make us of an approach called change data capture, or CDC for short.

Essentially CDC allows to listen to any modifications which are occurring at one end of a data flow (i.e. the data source) and communicate them as change events to other interested parties or storing them into a data sink. Instead of doing this in a point-to-point fashion, it’s advisable to decouple this flow of events between data sources and data sinks. Such a scenario can be implemented based on Debezium and Apache Kafka with relative ease and effectively no coding.

As an example, consider the following microservice-based architecture of an order management system:

Microservice-based architecture of an order management system

This system comprises three services, Order, Item and Stock. If the Order service receives an order request, it will need information from the other two, such as item definitions or the stock count for specific items. Instead of making synchronous calls to these services to obtain this information, CDC can be used to set up change event streams for the data managed by the Item and Stock services. The Order service can subscribe to these event streams and keep a local copy of the relevant item and stock data in its own database. This approach helps to decouple the services (e.g. no direct impact by service outages) and can also be beneficial for overall performance, as each service can hold optimized views just of those data items owned by other services which it is interested in.

How to Handle Aggregate Objects?

There are use cases however, where things are a bit more tricky. It is sometimes useful to share information across services and data stores by means of so-called aggregates, which are a concept/pattern defined by domain-driven design (DDD). In general, a DDD aggregate is used to transfer state which can be comprised of multiple different domain objects that are together treated as a single unit of information.

Concrete examples are:

  • customers and their addresses which are represented as a customer record aggregate storing a customer and a list of addresses

  • orders and corresponding line items which are represented as an order record aggregate storing an order and all its line items

Chances are that the data of the involved domain objects backing these DDD aggregates are stored in separate relations of an RDBMS. When making use of the CDC capabilities currently found in Debezium, all changes to domain objects will be independently captured and by default eventually reflected in separate Kafka topics, one per RDBMS relation. While this behaviour is tremendously helpful for a lot of use cases it can be pretty limiting to others, like the DDD aggregate scenario described above. Therefore, this blog post explores how DDD aggregates can be built based on Debezium CDC events, using the Kafka Streams API.

Capturing Change Events from a Data Source

The complete source code for this blog post is provided in the Debezium examples repository on GitHub. Begin by cloning this repository and changing into the kstreams directory:

git clone https://github.com/debezium/debezium-examples.git
cd kstreams

The project provides a Docker Compose file with services for all the components you may already know from the Debezium tutorial:

In addition it declares the following services:

  • MongoDB which will be used as a data sink

  • Another Kafka Connect instance which will host the MongoDB sink connector

  • A service for running the DDD aggregation process we’re going to build in the following

We’ll get to those three in a bit, for now let’s prepare the source side of our pipeline:

export DEBEZIUM_VERSION=0.7
docker-compose up mysql zookeeper kafka connect_source

Once all services have been started, register an instance of the Debezium MySQL connector by submitting the following JSON document:

{
    "name": "mysql-source",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "table.whitelist": "inventory.customers,inventory.addresses",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "transforms": "unwrap",
        "transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
        "transforms.unwrap.drop.tombstones":"false"
    }
}

To do so, run the following curl command:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @mysql-source.json

This sets up the connector for the specified database, using the given credentials. For our purposes we’re only interested in changes to the customers and addresses tables, hence the table.whitelist property is given to just select these two tables. Another noteworthy thing is the "unwrap" transform that is applied. By default, Debezium’s CDC events would contain the old and new state of changed rows and some additional metadata on the source of the change. By applying the UnwrapFromEnvelope SMT (single message transformation), only the new state will be propagated into the corresponding Kafka topics.

We can take a look at them once the connector has been deployed and finished its initial snapshot of the two captured tables:

docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers # or dbserver1.inventory.addresses

E.g. you should see the following output

(formatted and omitting the schema information for the sake of readability) for the topic with customer changes:

{
    "schema": { ... },
    "payload": {
        "id": 1001
    }
}
{
    "schema": { ... },
    "payload": {
        "id": 1001,
        "first_name": "Sally",
        "last_name": "Thomas",
        "email": "sally.thomas@acme.com"
    }
}
...

Building DDD Aggregates

The KStreams application is going to process data from the two Kafka topics. These topics receive CDC events based on the customers and addresses relations found in MySQL, each of which has its corresponding Jackson-annotated POJO (Customer and Address), enriched by a field holding the CDC event type (i.e. UPSERT/DELETE).

Since the Kafka topic records are in Debezium JSON format with unwrapped envelopes, a special SerDe has been written in order to be able to read/write these records using their POJO or Debezium event representation respectively. While the serializer simply converts the POJOs into JSON using Jackson, the deserializer is a "hybrid" one, being able to deserialize from either Debezium CDC events or jsonified POJOs.

With that in place, the KStreams topology to create and maintain DDD aggregates on-the-fly can be built as follows:

Customers Topic ("parent")

All the customer records are simply read from the customer topic into a KTable which will automatically maintain the latest state per customer according to the record key (i.e. the customer’s PK)

KTable<DefaultId, Customer> customerTable =
        builder.table(parentTopic, Consumed.with(defaultIdSerde,customerSerde));

Addresses Topic ("children")

For the address records the processing is a bit more involved and needs several steps. First, all the address records are read into a KStream.

KStream<DefaultId, Address> addressStream = builder.stream(childrenTopic,
        Consumed.with(defaultIdSerde, addressSerde));

Second, a 'pseudo' grouping of these address records is done based on their keys (the original primary key in the relation), During this step the relationships towards the corresponding customer records are maintained. This effectively allows to keep track which address record belongs to which customer record, even in the light of address record deletions. To achieve this an additional LatestAddress POJO is introduced which allows to store the latest known PK <→ FK relation in addition to the Address record itself.

KTable<DefaultId,LatestAddress> tempTable = addressStream
        .groupByKey(Serialized.with(defaultIdSerde, addressSerde))
        .aggregate(
                () -> new LatestAddress(),
                (DefaultId addressId, Address address, LatestAddress latest) -> {
                    latest.update(
                        address, addressId, new DefaultId(address.getCustomer_id()));
                    return latest;
                },
                Materialized.<DefaultId,LatestAddress,KeyValueStore<Bytes, byte[]>>
                        as(childrenTopic+"_table_temp")
                            .withKeySerde(defaultIdSerde)
                                .withValueSerde(latestAddressSerde)
        );

Third, the intermediate KTable is again converted to a KStream. The LatestAddress records are transformed to have the customer id (FK relationship) as their new key in order to group them per customer. During the grouping step, customer specific addresses are updated which can result in an address record being added or deleted. For this purpose, another POJO called Addresses is introduced, which holds a map of address records that gets updated accordingly. The result is a KTable holding the most recent Addresses per customer id.

KTable<DefaultId, Addresses> addressTable = tempTable.toStream()
        .map((addressId, latestAddress) ->
            new KeyValue<>(latestAddress.getCustomerId(),latestAddress))
        .groupByKey(Serialized.with(defaultIdSerde,latestAddressSerde))
        .aggregate(
                () -> new Addresses(),
                (customerId, latestAddress, addresses) -> {
                    addresses.update(latestAddress);
                    return addresses;
                },
                Materialized.<DefaultId,Addresses,KeyValueStore<Bytes, byte[]>>
                        as(childrenTopic+"_table_aggregate")
                            .withKeySerde(defaultIdSerde)
                                .withValueSerde(addressesSerde)
        );

Combining Customers With Addresses

Finally, it’s easy to bring customers and addresses together by joining the customers KTable with the addresses KTable and thereby building the DDD aggregates which are represented by the CustomerAddressAggregate POJO. At the end, the KTable changes are written to a KStream, which in turn gets saved into a kafka topic. This allows to make use of the resulting DDD aggregates in manifold ways.

KTable<DefaultId,CustomerAddressAggregate> dddAggregate =
          customerTable.join(addressTable, (customer, addresses) ->
              customer.get_eventType() == EventType.DELETE ?
                      null :
                      new CustomerAddressAggregate(customer,addresses.getEntries())
          );

  dddAggregate.toStream().to("final_ddd_aggregates",
                              Produced.with(defaultIdSerde,(Serde)aggregateSerde));

Records in the customers KTable might receive a CDC delete event. If so, this can be detected by checking the event type field of the customer POJO and e.g. return 'null' instead of a DDD aggregate. Such a convention can be helpful whenever consuming parties also need to act to deletions accordingly._

Running the Aggregation Pipeline

Having implemented the aggregation pipeline, it’s time to give it a test run. To do so, build the poc-ddd-aggregates Maven project which contains the complete implementation:

mvn clean package -f poc-ddd-aggregates/pom.xml

Then run the aggregator service from the Compose file which takes the JAR built by this project and launches it using the java-jboss-openjdk8-jdk base image:

docker-compose up -d aggregator

Once the aggregation pipeline is running, we can take a look at the aggregated events using the console consumer:

docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic final_ddd_aggregates

Transferring DDD Aggregates to Data Sinks

We originally set out to build these DDD aggregates in order to transfer data and synchronize changes between a data source (MySQL tables in this case) and a convenient data sink. By definition, DDD aggregates are typically complex data structures and therefore it makes perfect sense to write them to data stores which offer flexible ways and means to query and/or index them. Talking about NoSQL databases, a document store seems the most natural choice with MongoDB being the leading database for such use cases.

Thanks to Kafka Connect and numerous turn-key ready connectors it is almost effortless to get this done. Using a MongoDB sink connector from the open-source community, it is easy to have the DDD aggregates written into MongoDB. All it needs is a proper configuration which can be posted to the REST API of Kafka Connect in order to run the connector.

So let’s start MongoDb and another Kafka Connect instance for hosting the sink connector:

docker-compose up -d mongodb connect_sink

In case the DDD aggregates should get written unmodified into MongoDB, a configuration may look as simple as follows:

{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": "1",
        "topics": "final_ddd_aggregates",
        "mongodb.connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true",
        "mongodb.collection": "customers_with_addresses",
        "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy",
        "mongodb.delete.on.null.values": true
    }
}

As with the source connector, deploy the connector using curl:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8084/connectors/ -d @mongodb-sink.json

This connector will consume messages from the "final_ddd_aggregates" Kafka topic and write them as MongoDB documents into the "customers_with_addresses" collection.

You can take a look by firing up a Mongo shell and querying the collection’s contents:

docker-compose exec mongodb bash -c 'mongo inventory'

> db.customers_with_addresses.find().pretty()
{
    "_id": {
        "id": "1001"
    },
    "addresses": [
        {
            "zip": "76036",
            "_eventType": "UPSERT",
            "city": "Euless",
            "street": "3183 Moore Avenue",
            "id": "10",
            "state": "Texas",
            "customer_id": "1001",
            "type": "SHIPPING"
        },
        {
            "zip": "17116",
            "_eventType": "UPSERT",
            "city": "Harrisburg",
            "street": "2389 Hidden Valley Road",
            "id": "11",
            "state": "Pennsylvania",
            "customer_id": "1001",
            "type": "BILLING"
        }
    ],
    "customer": {
        "_eventType": "UPSERT",
        "last_name": "Thomas",
        "id": "1001",
        "first_name": "Sally",
        "email": "sally.thomas@acme.com"
    }
}

Due to the combination of the data in a single document some parts aren’t needed or redundant. To get rid of any unwanted data (e.g. _eventType, customer_id of each address sub-document) it would also be possible to adapt the configuration in order to blacklist said fields.

Finally, you update some customer or address data in the MySQL source database:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

mysql> update customers set first_name= "Sarah" where id = 1001;

Shortly thereafter, you should see that the corresponding aggregate document in MongoDB has been updated accordingly.

Drawbacks and Limitations

While this first version for creating DDD aggregates from table-based CDC events basically works, it is very important to understand its current limitations:

  • not generically applicable thus needs custom code for POJOs and intermediate types

  • cannot be scaled across multiple instances as is due to missing but necessary data repartitioning prior to processing

  • limited to building aggregates based on a single JOIN between 1:N relationships

  • resulting DDD aggregates are eventually consistent, meaning that it is possible for them to temporarily exhibit intermediate state before converging

The first few can be addressed with a reasonable amount of work on the KStreams application. The last one, dealing with the eventually consistent nature of resulting DDD aggregates is much harder to correct and will require some efforts at Debezium’s own CDC mechanism.

Outlook

In this post we described an approach for creating aggregated events from Debezium’s CDC events. In a follow-up blog post we may dive a bit more into the topic of how to be able to horizontally scale the DDD creation by running multiple KStreams aggregator instances. For that purpose, the data needs proper re-partitioning before running the topology. In addition, it could be interesting to look into a somewhat more generic version which only needs custom classes to the describe the two main POJOs involved.

We also thought about providing a ready-to-use component which would work in a generic way (based on Connect records, i.e. not tied to a specific serialization format such as JSON) and could be set up as a configurable stand-alone process running given aggregations.

Also on the topic of dealing with eventual consistency we got some ideas, but those will need some more exploration and investigation for sure. Stay tuned!

We’d love to hear about your feedback on the topic of event aggreation. If you got any ideas or thoughts on the subject, please get in touch by posting a comment below or sending a message to our mailing list.


back to top