Subscribe


Automating Cache Invalidation With Change Data Capture

The second-level cache of Hibernate ORM / JPA is a proven and efficient way to increase application performance: caching read-only or rarely modified entities avoids roundtrips to the database, resulting in improved response times of the application.

Unlike the first-level cache, the second-level cache is associated with the session factory (or entity manager factory in JPA terms), so its contents are shared across transactions and concurrent sessions. Naturally, if a cached entity gets modified, the corresponding cache entry must be updated (or purged from the cache), too. As long as the data changes are done through Hibernate ORM, this is nothing to worry about: the ORM will update the cache automatically.

Things get tricky, though, when bypassing the application, e.g. when modifying records directly in the database. Hibernate ORM then has no way of knowing that the cached data has become stale, and it’s necessary to invalidate the affected items explicitly. A common way for doing so is to foresee some admin functionality that allows to clear an application’s caches. For this to work, it’s vital to not forget about calling that invalidation functionality, or the application will keep working with outdated cached data.

In the following we’re going to explore an alternative approach for cache invalidation, which works in a reliable and fully automated way: by employing Debezium and its change data capture (CDC) capabilities, you can track data changes in the database itself and react to any applied change. This allows to invalidate affected cache entries in near-realtime, without the risk of stale data due to missed changes. If an entry has been evicted from the cache, Hibernate ORM will load the latest version of the entity from the database the next time is requested.

The Example Application

As an example, consider this simple model of two entities, PurchaseOrder and Item:

Example domain model

A purchase order represents the order of an item, where its total price is the ordered quantity times the item’s base price.

Source Code

The source code of this example is provided on GitHub. If you want to follow along and try out all the steps described in the following, clone the repo and follow the instructions in README.md for building the project.

Modelling order and item as JPA entities is straight-forward:

@Entity
public class PurchaseOrder {

    @Id
    @GeneratedValue(generator = "sequence")
    @SequenceGenerator(
        name = "sequence", sequenceName = "seq_po", initialValue = 1001, allocationSize = 50
    )
    private long id;
    private String customer;
    @ManyToOne private Item item;
    private int quantity;
    private BigDecimal totalPrice;

    // ...
}

As changes to items are rare, the Item entity should be cached. This can be done by simply specifying JPA’s @Cacheable annotation:

@Entity
@Cacheable
public class Item {

    @Id
    private long id;
    private String description;
    private BigDecimal price;

    // ...
}

You also need to enable the second-level cache in the META-INF/persistence.xml file. The property hibernate.cache.use_second_level_cache activates the cache itself, and the ENABLE_SELECTIVE cache mode causes only those entities to be put into the cache which are annotated with @Cacheable. It’s also a good idea to enable SQL query logging and cache access statistics. That way you’ll be able to verify whether things work as expected by examining the application log:

<?xml version="1.0" encoding="utf-8"?>
<persistence xmlns="http://xmlns.jcp.org/xml/ns/persistence"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="..."
    version="2.2">

    <persistence-unit name="orders-PU-JTA" transaction-type="JTA">
        <jta-data-source>java:jboss/datasources/OrderDS</jta-data-source>
        <shared-cache-mode>ENABLE_SELECTIVE</shared-cache-mode>
        <properties>
            <property name="hibernate.cache.use_second_level_cache" value="true" />

            <property name="hibernate.show_sql" value="true" />
            <property name="hibernate.format_sql" value="true" />
            <property name="hibernate.generate_statistics" value="true" />

            <!-- dialect etc. ... -->
        </properties>
    </persistence-unit>
</persistence>

When running on a Java EE application server (or Jakarta EE how the stack is called after it has been donated to the Eclipse Foundation), that’s all you need to enable second-level caching. In the case of WildFly (which is what’s used in the example project), the Infinispan key/value store is used as the cache provider by default.

Now try and see what happens when modifying an item’s price by running some SQL in the database, bypassing the application layer. If you’ve checked out the example source code, comment out the DatabaseChangeEventListener class and start the application as described in the README.md. You then can place purchase orders using curl like this (a couple of example items have been persisted at application start-up):

> curl -H "Content-Type: application/json" \
  -X POST \
  --data '{ "customer" : "Billy-Bob", "itemId" : 10003, "quantity" : 2 }' \
  http://localhost:8080/cache-invalidation/rest/orders
{
    "id" : 1002,
    "customer" : "Billy-Bob",
    "item" : {
        "id" :10003,
        "description" : "North By Northwest",
        "price" : 14.99
    },
    "quantity" : 2,
    "totalPrice" : 29.98
}

The response is the expected one, as the item price is 14.99. Now update the item’s price directly in the database. The example uses Postgres, so you can use the psql CLI utility to do so:

docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "UPDATE item SET price = 20.99 where id = 10003"'

Placing another purchase order for the same item using curl, you’ll see that the calculated total price doesn’t reflect the update. Not good! But it’s not too surprising, given that the price update was applied completely bypassing the application layer and Hibernate ORM.

The Change Event Handler

Now let’s explore how to use Debezium and CDC to react to changes in the item table and invalidate corresponding cache entries.

While Debezium most of the times is deployed into Kafka Connect (thus streaming change events into Apache Kafka topics), it has another mode of operation that comes in very handy for the use case at hand. Using the embedded engine, you can run the Debezium connectors as a library directly within your application. For each change event received from the database, a configured callback method will be invoked, which in the case at hand will evict the affected item from the second-level cache.

The following picture shows the design of this approach:

Architecture Overview

While this doesn’t come with the scalability and fault tolerance provided by Apache Kafka, it nicely fits the given requirements. As the second-level cache is bound to the application lifecycle, there is for instance no need for the offset management and restarting capabilities provided by the Kafka Connect framework. For the given use case it is enough to receive data change events while the application is running, and using the embedded engine enables exactly that.

Clustered Applications

Note that it still might make sense to use Apache Kafka and the regular deployment of Debezium into Kafka Connect when running a clustered application where each node has a local cache. Instead of registering a connector on each node, Kafka and Connect would allow you to deploy a single connector instance and have the application nodes listen to the topic(s) with the change events. This would result in less resource utilization in the database.

Having added the dependencies of the Debezium embedded engine (io.debezium:debezium-embedded:0.9.0.Beta1) and the Debezium Postgres connector (io.debezium:debezium-connector-postgres:0.9.0.Beta1) to your project, a class DatabaseChangeEventListener for listening to any changes in the database can be implemented like this:

@ApplicationScoped
public class DatabaseChangeEventListener {

    @Resource
    private ManagedExecutorService executorService;

    @PersistenceUnit private EntityManagerFactory emf;

    @PersistenceContext
    private EntityManager em;

    private EmbeddedEngine engine;

    public void startEmbeddedEngine(@Observes @Initialized(ApplicationScoped.class) Object init) {
        Configuration config = Configuration.empty()
                .withSystemProperties(Function.identity()).edit()
                .with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class)
                .with(EmbeddedEngine.ENGINE_NAME, "cache-invalidation-engine")
                .with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class)
                .with("name", "cache-invalidation-connector")
                .with("database.hostname", "postgres")
                .with("database.port", 5432)
                .with("database.user", "postgresuser")
                .with("database.password", "postgrespw")
                .with("database.server.name", "dbserver1")
                .with("database.dbname", "inventory")
                .with("database.whitelist", "public")
                .with("snapshot.mode", "never")
                .build();

        this.engine = EmbeddedEngine.create()
                .using(config)
                .notifying(this::handleDbChangeEvent)
                .build();

        executorService.execute(engine);
    }

    @PreDestroy
    public void shutdownEngine() {
        engine.stop();
    }

    private void handleDbChangeEvent(SourceRecord record) {
        if (record.topic().equals("dbserver1.public.item")) {
            Long itemId = ((Struct) record.key()).getInt64("id");
            Struct payload = (Struct) record.value();
            Operation op = Operation.forCode(payload.getString("op"));

            if (op == Operation.UPDATE || op == Operation.DELETE) {
                emf.getCache().evict(Item.class, itemId);
            }
        }
    }
}

Upon application start-up, this configures an instance of the Debezium Postgres connector and sets up the embedded engine for running the connector. The connector options (host name, credentials etc.) are mostly the same as when deploying the connector into Kafka Connect. There is no need for doing an initial snapshot of the existing data, hence the snapshot mode is set to "never".

The offset storage option is used for controlling how connector offsets should be persisted. As it’s not necessary to process any change events occurring while the connector is not running (instead you’d just begin to read the log from the current location after the restart), the in-memory implementation provided by Kafka Connect is used.

Once configured, the embedded engine must be run via an Executor instance. As the example runs in WildFly, a managed executor can simply be obtained through @Resource injection for that purpose (see JSR 236).

The embedded engine is configured to invoke the handleDbChangeEvent() method for each received data change event. In this method it first is checked whether the incoming event originates from the item table. If that’s the case, and if the change event represents an UPDATE or DELETE statement, the affected Item instance is evicted from the second-level cache. JPA 2.0 provides a simple API for this purpose which is accessible via the EntityManagerFactory.

With the DatabaseChangeEventListener class in place, the cache entry will now automatically be evicted when doing another item update via psql. When placing the first purchase order for that item after the update, you’ll see in the application log how Hibernate ORM executes a query SELECT ... FROM item ... in order to load the item referenced by the order. Also the cache statistics will report one "L2C miss". Upon subsequent orders of that same item it will be obtained from the cache again.

Eventual Consistency

While the event handling happens in near-realtime, it’s important to point out that it still applies eventual consistency semantics. This means that there is a very short time window between the point in time where a transaction is committed and the point in time where the change event is streamed from the log to the event handler and the cache entry is invalidated.

Avoiding Cache Invalidations After Application-triggered Data Changes

The change event listener shown above satisfies the requirement of invalidating cached items after external data changes. But in its current form it is evicting cache items a bit too aggressively: cached items will also be purged when updating an Item instance through the application itself. This is not only not needed (as the cached item already is the current version), but it’s even counter-productive: the superfluous cache evictions will cause additional database roundtrips, resulting in longer response times.

It is therefore necessary to distinguish between data changes performed by the application itself and external data changes. Only in the latter case the affected items should be evicted from the cache. In order to do so, you can leverage the fact that each Debezium data change event contains the id of the originating transaction. Keeping track of all transactions run by the application itself allows to trigger the cache eviction only for those items altered by external transactions.

Accounting for this change, the overall architecture looks like so:

Architecture Overview with Transaction Registry

The first thing to implement is the transaction registry, i.e. a class for the transaction book keeping:

@ApplicationScoped
public class KnownTransactions {

    private final DefaultCacheManager cacheManager;
    private final Cache<Long, Boolean> applicationTransactions;

    public KnownTransactions() {
        cacheManager = new DefaultCacheManager();
        cacheManager.defineConfiguration(
                "tx-id-cache",
                new ConfigurationBuilder()
                    .expiration()
                        .lifespan(60, TimeUnit.SECONDS)
                    .build()
                );

        applicationTransactions = cacheManager.getCache("tx-id-cache");
    }

    @PreDestroy
    public void stopCacheManager() {
        cacheManager.stop();
    }

    public void register(long txId) {
        applicationTransactions.put(txId, true);
    }

    public boolean isKnown(long txId) {
        return Boolean.TRUE.equals(applicationTransactions.get(txId));
    }
}

This uses the Infinispan DefaultCacheManager for creating and maintaining an in-memory cache of transaction ids encountered by the application. As data change events arrive in near-realtime, the TTL of the cache entries can be rather short (in fact, the value of one minute shown in the example is chosen very conservatively, usually events should be received within seconds).

The next step is to retrieve the current transaction id whenever a request is processed by the application and register it within KnownTransactions. This should happen once per transaction. There are multiple ways for implementing this logic; in the following a Hibernate ORM FlushEventListener is used for this purpose:

class TransactionRegistrationListener implements FlushEventListener {

    private volatile KnownTransactions knownTransactions;

    public TransactionRegistrationListener() {
    }

    @Override
    public void onFlush(FlushEvent event) throws HibernateException {
        event.getSession().getActionQueue().registerProcess( session -> {
            Number txId = (Number) event.getSession().createNativeQuery("SELECT txid_current()")
                    .setFlushMode(FlushMode.MANUAL)
                    .getSingleResult();

            getKnownTransactions().register(txId.longValue());
        } );
    }

    private  KnownTransactions getKnownTransactions() {
        KnownTransactions value = knownTransactions;

        if (value == null) {
            knownTransactions = value = CDI.current().select(KnownTransactions.class).get();
        }

        return value;
    }
}

As there’s no portable way to obtain the transaction id, this is done using a native SQL query. In the case of Postgres, the txid_current() function can be called for that. Hibernate ORM event listeners are not subject to dependency injection via CDI. Hence the static current() method is used to obtain a handle to the application’s CDI container and get a reference to the KnownTransactions bean.

This listener will be invoked whenever Hibernate ORM is synchronizing its persistence context with the database ("flushing"), which usually happens exactly once when the transaction is committed.

Manual Flushes

The session / entity manager can also be flushed manually, in which case the txid_current() function would be invoked multiple times. That’s neglected here for the sake of simplicity. The actual code in the example repo contains a slightly extended version of this class which makes sure that the transaction id is obtained only once.

To register the flush listener with Hibernate ORM, an Integrator implementation must be created and declared in the META-INF/services/org.hibernate.integrator.spi.Integrator file:

public class TransactionRegistrationIntegrator implements Integrator {

    @Override
    public void integrate(Metadata metadata, SessionFactoryImplementor sessionFactory,
            SessionFactoryServiceRegistry serviceRegistry) {
        serviceRegistry.getService(EventListenerRegistry.class)
            .appendListeners(EventType.FLUSH, new TransactionRegistrationListener());
    }

    @Override
    public void disintegrate(SessionFactoryImplementor sessionFactory,
            SessionFactoryServiceRegistry serviceRegistry) {
    }
}
io.debezium.examples.cacheinvalidation.persistence.TransactionRegistrationIntegrator

During bootstrap, Hibernate ORM will detect the integrator class (by means of the Java service loader), invoke its integrate() method which in turn will register the listener class for the FLUSH event.

The last step is to exclude any events stemming from transactions run by the application itself in the database change event handler:

@ApplicationScoped
public class DatabaseChangeEventListener {

    // ...

    @Inject
    private KnownTransactions knownTransactions;

    private void handleDbChangeEvent(SourceRecord record) {
        if (record.topic().equals("dbserver1.public.item")) {
            Long itemId = ((Struct) record.key()).getInt64("id");
            Struct payload = (Struct) record.value();
            Operation op = Operation.forCode(payload.getString("op"));
            Long txId = ((Struct) payload.get("source")).getInt64("txId");

            if (!knownTransactions.isKnown(txId) &&
                    (op == Operation.UPDATE || op == Operation.DELETE)) {
                emf.getCache().evict(Item.class, itemId);
            }
        }
    }
}

And with that, you got all the pieces in place: cached Items will only be evicted after external data changes, but not after changes done by the application itself. To confirm, you can invoke the example’s items resource using curl:

> curl -H "Content-Type: application/json" \
  -X PUT \
  --data '{ "description" : "North by Northwest", "price" : 20.99}' \
  http://localhost:8080/cache-invalidation/rest/items/10003

When placing the next order for the item after this update, you should see that the Item entity is obtained from the cache, i.e. the change event will not have caused the item’s cache entry to be evicted. In contrast, if you update the item’s price via psql another time, the item should be removed from the cache and the order request will produce a cache miss, followed by a SELECT against the item table in the database.

Summary

In this blog post we’ve explored how Debezium and change data capture can be employed to invalidate application-level caches after external data changes. Compared to manual cache invalidation, this approach works very reliably (by capturing changes directly from the database log, no events will be missed) and fast (cache eviction happens in near-realtime after the data changes).

As you have seen, not too much glue code is needed in order to implement this. While the shown implementation is somewhat specific to the entities of the example, it should be possible to implement the change event handler in a more generic fashion, so that it can handle a set of configured entity types (essentially, the database change listener would have to convert the primary key field(s) from the change events into the primary key type of the corresponding entities in a generic way). Also such generic implementation would have to provide the logic for obtaining the current transaction id for the most commonly used databases.

Please let us know whether you think this would be an interesting extension to have for Debezium and Hibernate ORM. For instance this could be a new module under the Debezium umbrella, and it could also be a very great project to work on, should you be interested in contributing to Debezium. If you got any thoughts on this idea, please post a comment below or come to our mailing list.

Many thanks to Guillaume Smet, Hans-Peter Grahsl and Jiri Pechanec for their feedback while writing this post!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium 0.9.0.Beta1 Released

It’s my pleasure to announce the release of Debezium 0.9.0.Beta1! Oh, and to those of you who are celebrating it — Happy Thanksgiving!

This new Debezium release comes with several great improvements to our work-in-progress SQL Server connector:

  • Initial snapshots can be done using the snapshot isolation level if enabled in the DB (DBZ-941)

  • Changes to the structures of captured tables after the connector has been set up are supported now (DBZ-812)

  • New connector option decimal.handling.mode (DBZ-953) and pass-through of any database.* option to the JDBC driver (DBZ-964)

Besides that, we spent some time on supporting the latest versions of the different databases. The Debezium connectors now support Postgres 11 (DBZ-955) and MongoDB 4.0 (DBZ-974). We are also working on supporting MySQL 8.0, which should be completed in the next 0.9.x release. The Debezium container images have been updated to Kafka 2.0.1 (DBZ-979) and the Kafka Connect image now supports the STATUS_STORAGE_TOPIC environment variable, bringing consistency with CONFIG_STORAGE_TOPIC and OFFSET_STORAGE_TOPIC that already were supported before (DBZ-893).

As usual, several bugs were fixed, too. Several of them dealt with the new Antlr-based DDL parser for the MySQL connector. By now we feel confident about its implementation, so it’s the default DDL parser as of this release (DBZ-757). If you would like to continue to use the legacy parser for some reason, you can do so by setting the ddl.parser.mode connector option to "legacy". This implementation will remain available in the lifetime of Debezium 0.9.x and is scheduled for removal after that. So please make sure to fail issues in JIRA should you run into any problems with the Antlr parser.

Overall, this release contains 21 fixes. Thanks a lot to all the community members who helped with making this happen: Anton Martynov, Deepak Barr, Grzegorz Kołakowski, Olavi Mustanoja, Renato Mefi, Sagar Rao and Shivam Sharma!

What else?

While the work towards Debezium 0.9 continues, we’ve lately been quite busy with presenting Debezium at multiple conferences. You can find the slides and recordings from Kafka Summit San Francisco and Voxxed Days Microservices on our list of online resources around Debezium.

There you also can find the links to the slides of the great talk "The Why’s and How’s of Database Streaming" by Joy Gao of WePay, a Debezium user of the first hour, as well as the link to a blog post by Hans-Peter Grahsl about setting up a CDC pipeline from MySQL into Cosmos DB running on Azure. If you know about other great articles, session recordings or similar on Debezium and change data capture which should be added there, please let us know.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium 0.9.0.Alpha2 Released

It’s my pleasure to announce the release of Debezium 0.9.0.Alpha2!

While the work on the connectors for SQL Server and Oracle continues, we decided to do another Alpha release, as lots of fixes and new features - many of them contributed by community members - have piled up, which we wanted to get into your hands as quickly as possible.

This release supports Apache Kafka 2.0, comes with support for Postgres' HSTORE column type, allows to rename and filter fields from change data messages for MongoDB and contains multiple bug fixes and performance improvements. Overall, this release contains 55 fixes (note that a few of these have been merged back to 0.8.x and are contained in earlier 0.8 releases, too).

A big "Thank You" is in order to community members Andrey Pustovetov, Artiship Artiship, Cliff Wheadon, Deepak Barr, Ian Axelrod, Liu Hanlin, Maciej Bryński, Ori Popowski, Peng Lyu, Philip Sanetra, Sagar Rao and Syed Muhammad Sufyian for their contributions to this release. We salute you!

Kafka Upgrade

Debezium runs with and has been tested on top of the recently released Apache Kafka 2.0 (DBZ-858). The widely used version Kafka 1.x continues to be supported as well.

Note that 0.10.x is not supported due to Debezium’s usage of the admin client API which is only available in later versions. It shouldn’t be too hard to work around this, so if someone is interested in helping out with this, this would be a great contribution (see DBZ-883).

Support for HSTORE columns in Postgres

Postgres is an amazingly powerful and flexible RDBMS, not the least due to its wide range of column types which go far beyond what’s defined by the SQL standard. One of these types being HSTORE, which is a string-to-string map essentially.

Debezium can capture changes to columns of this type now (DBZ-898). By default, the field values will be represented using Kafka Connect’s map data type. As this may not be supported by all sink connectors, you might alternatively represent them as a string-ified JSON by setting the new hstore.handling.mode connector option to json. In this case, you’d see HSTORE columns represented as values in change messages like so: { "key1" : "val1", "key2" : "val2" }.

Field filtering and renaming for MongoDB

Unlike the connectors for MySQL and Postgres, the Debezium MongoDB connector so far didn’t allow to exclude single fields of captured collections from CDC messages. Also renaming them wasn’t supported e.g. by means of Kafka’s ReplaceField SMT. The reason being that MongoDB doesn’t mandate a fixed schema for the documents of a given collection, and documents therefore are represented in change messages using a single string-ified JSON field.

Thanks to the fantastic work of community member Andrey Pustovetov, this finally has changed, i.e. you can remove given fields (DBZ-633) now from the CDC messages of given collections or have them renamed (DBZ-881). Please refer to the description of the new connector options field.blacklist and field.renames in the MongoDB connector documentation to learn more.

Extended source info

Another contribution by Andrey is the new optional connector field within the source info block of CDC messages (DBZ-918). This tells the type of source connector that produced the messages ("mysql", "postgres" etc.), which can come in handy in cases where specific semantics need to be applied on the consumer side depending on the type of source database.

Bug fixes and version upgrades

The new release contains a good number of bug fixes and other smaller improvements. Amongst them are

  • correct handling of invalid temporal default values with MySQL (DBZ-927),

  • support for table/collection names with special characters for MySQL (DBZ-878) and MongoDB (DBZ-865) and

  • fixed handling of blacklisted tables with the new Antlr-based DDL parser (DBZ-872).

Community member Ian Axelrod provided a fix for a potential performance issue, where changes to tables with TOAST columns in Postgres would cause repeated updates to the connector’s internal schema metadata, which can be a costly operation (DBZ-911). Please refer to the Postgres connector documentation for details on the new schema.refresh.mode option, which deals with this issue.

In terms of version upgrades we migrated to the latest releases of the MySQL (DBZ-763, DBZ-764) and Postgres drivers (DBZ-912). The former is part of a longer stream of work leading towards support of MySQL 8 which should be finished in one of the next Debezium releases. For Postgres we provide a Docker image with Debezium’s supported logical decoding plug-ins based on Alpine now, which might be interesting to those concerned about container size (DBZ-705).

Please see the change log for the complete list of fixed issues.

What’s next?

The work towards Debezium 0.9 continues, and we’ll focus mostly on improvements to the SQL Server and Oracle connectors. Other potential topics include support for MySQL 8 and native logical decoding as introduced with Postgres 10, which should greatly help with using the Debezium Postgres connectors in cloud environments such as Amazon RDS.

We’ll also be talking about Debezium at the following conferences:

Already last week I had the opportunity to present Debezium at JUG Saxony Day. If you are interested, you can find the (German) slideset of that talk on Speaker Deck.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Materializing Aggregate Views With Hibernate and Debezium

Updating external full text search indexes (e.g. Elasticsearch) after data changes is a very popular use case for change data capture (CDC).

As we’ve discussed in a blog post a while ago, the combination of Debezium’s CDC source connectors and Confluent’s sink connector for Elasticsearch makes it straight forward to capture data changes in MySQL, Postgres etc. and push them towards Elastisearch in near real-time. This results in a 1:1 relationship between tables in the source database and a corresponding search index in Elasticsearch, which is perfectly fine for many use cases.

It gets more challenging though if you’d like to put entire aggregates into a single index. An example could be a customer and all their addresses; those would typically be stored in two separate tables in an RDBMS, linked by a foreign key, whereas you’d like to have just one index in Elasticsearch, containing documents of customers with their addresses embedded, allowing you to efficiently search for customers based on their address.

Following up to the KStreams-based solution to this we described recently, we’d like to present in this post an alternative for materializing such aggregate views driven by the application layer.

Overview

The idea is to materialize views in a separate table in the source database, right in the moment the original data is altered.

Aggregates are serialized as JSON structures (which naturally can represent any nested object structure) and stored in a specific table. This is done within the actual transaction altering the data, which means the aggregate view is always consistent with the primary data. In particular this approach isn’t prone to exposing intermediary aggregations as the KStreams-based solution discussed in the post linked above.

The following picture shows the overall architecture:

Streaming Materialized Aggregate Views to Elastisearch

Here the aggregate views are materialized by means of a small extension to Hibernate ORM, which stores the JSON aggregates within the source database (note "aggregate views" can be considered conceptually the same as "materialized views" as known from different RDBMS, as in that they materialize the result of a "join" operation, but technically we’re not using the latter to store aggregate views, but a regular table). Changes to that aggregate table are then captured by Debezium and streamed to one topic per aggregate type. The Elasticsearch sink connector can subscribe to these topics and update corresponding full-text indexes.

You can find a proof-of-concept implementation (said Hibernate extension and related code) of this idea in our examples repository. Of course the general idea isn’t limited to Hibernate ORM or JPA, you could implement something similar with any other API you’re using to access your data.

Creating Aggregate Views via Hibernate ORM

For the following let’s assume we’re persisting a simple domain model (comprising a Customer entity and a few related ones like Address, (customer) Category etc.) in a database. Using Hibernate for that allows us to make the creation of aggregates fully transparent to the actual application code using a Hibernate event listener. Thanks to its extensible architecture, we can plug such listener into Hibernate just by adding it to the classpath, from where it will be picked up automatically when bootstrapping the entity manager / session factory.

Our example listener reacts to an annotation, @MaterializeAggregate, which marks those entity types that should be the roots of materialized aggregates.

@Entity
@MaterializeAggregate(aggregateName="customers-complete")
public class Customer {

    @Id
    private long id;

    private String firstName;

    @OneToMany(mappedBy = "customer", fetch = FetchType.EAGER, cascade = CascadeType.ALL)
    private Set<Address> addresses;

    @ManyToOne
    private Category category;

    ...
}

Now if any entity annotated with @MaterializeAggregate is inserted, updated or deleted via Hibernate, the listener will kick in and materialize a JSON view of the aggregate root (customer) and its associated entities (addresses, category).

Under the hood the Jackson API is used for serializing the model into JSON. This means you can use any of its annotations to customize the JSON output, e.g. @JsonIgnore to exclude the inverse relationship from Address to Customer:

@Entity
public class Address {

    @Id
    private long id;

    @ManyToOne
    @JoinColumn(name = "customer_id")
    @JsonIgnore
    private Customer customer;

    private String street;

    private String city;

    ...
}

Note that Address itself isn’t marked with @MaterializeAggregate, i.e. it won’t be materialized into an aggregate view by itself.

After using JPA’s EntityManager to insert or update a few customers, let’s take a look at the aggregates table which has been populated by the listener (value schema omitted for the sake of brevity):

> select * from aggregates;

| rootType | keySchema | rootId | materialization | valueSchema |

| customers-complete

| {
  "schema" : {
    "type" : "struct",
    "fields" : [ {
      "type" : "int64",
      "optional" : false,
      "field" : "id"
    } ],
    "optional" : false,
    "name" : "customers-complete.Key"
  }
}

| { "id" : 1004 }

| { "schema" : { ... } }

| {
  "id" : 1004,
  "firstName" : "Anne",
  "lastName" : "Kretchmar",
  "email" : "annek@noanswer.org",
  "tags" : [ "long-term", "vip" ],
  "birthday" : 5098,
  "category" : {
    "id" : 100001,
    "name" : "Retail"
  },
  "addresses" : [ {
    "id" : 16,
    "street" : "1289 University Hill Road",
    "city" : "Canehill",
    "state" : "Arkansas",
    "zip" : "72717",
    "type" : "SHIPPING"
  } ]
} |

The table contains these columns:

  • rootType: The name of the aggregate as given in the @MaterializeAggregate annotation

  • rootId: The aggregate’s id as serialized JSON

  • materialization: The aggregate itself as serialized JSON; in this case a customer and their addresses, category etc.

  • keySchema: The Kafka Connect schema of the row’s key

  • valueSchema: The Kafka Connect schema of the materialization

Let’s talk about the two schema columns for a bit. JSON itself is quite limited as far as its supported data types are concerned. So for instance we’d loose information about a numeric field’s value range (int vs. long etc.) without any additional information. Therefore the listener derives the corresponding schema information for key and aggregate view from the entity model and stores it within the aggregate records.

Now Jackson itself only supports JSON Schema, which would be a bit too limited for our purposes. Hence the example implementation provides custom serializers for Jackson’s schema system, which allow us to emit Kafka Connect’s schema representation (with more precise type information) instead of plain JSON Schema. This will come in handy in the following when we’d like to expand the string-based JSON representations of key and value into properly typed Kafka Connect records.

Capturing Changes to the Aggregate Table

We now have a mechanism in place which transparently persists aggregates into a separate table within the source database, whenever the application data is changed through Hibernate. Note that this happens within the boundaries of the source transaction, so if the same would be rolled back for some reason, also the aggregate view would not be updated.

The Hibernate listener uses insert-or-update semantics when writing an aggregate view, i.e. for a given aggregate root there’ll always be exactly one corresponding entry in the aggregate table which reflects its current state. If an aggregate root entity is deleted, the listener will also drop the entry from the aggregate table.

So let’s set up Debezium now to capture any changes to the aggregates table:

curl -i -X POST \
  -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  http://localhost:8083/connectors/ -d @- <<-EOF
  {
      "name": "inventory-connector",
      "config": {
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "tasks.max": "1",
          "database.hostname": "mysql",
          "database.port": "3306",
          "database.user": "debezium",
          "database.password": "dbz",
          "database.server.id": "184054",
          "database.server.name": "dbserver1",
          "database.whitelist": "inventory",
          "table.whitelist": ".*aggregates",
          "database.history.kafka.bootstrap.servers": "kafka:9092",
          "database.history.kafka.topic": "schema-changes.inventory"
      }
  }
EOF

This registers the MySQL connector with the "inventory" database (we’re using an expanded version of the schema from the Debezium tutorial), capturing any changes to the "aggregates" table.

Expanding JSON

If we now were to browse the corresponding Kafka topic, we’d see data change events in the known Debezium format for all the changes to the aggregates table.

The "materialization" field with the records' "after" state still is a single field containing a JSON string, though. What we’d rather like to have is a strongly typed Kafka Connect record, whose schema exactly describes the aggregate structure and the types of its fields. For that purpose the example project provides an SMT (single message transform) which takes the JSON materialization and the corresponding valueSchema and converts this into a full-blown Kafka Connect record. The same is done for keys. DELETE events are rewritten into tombstone events. Finally, the SMT re-routes every record to a topic named after the aggregate root, allowing consumers to subscribe just to changes to specific aggregate types.

So let’s add that SMT when registering the Debezium CDC connector:

...
"transforms":"expandjson",
"transforms.expandjson.type":"io.debezium.aggregation.smt.ExpandJsonSmt",
...

When now browsing the "customers-complete" topic, we’ll see the strongly typed Kafka Connect records we’d expect:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int64",
                "optional": false,
                "field": "id"
            }
        ],
        "optional": false,
        "name": "customers-complete.Key"
    },
    "payload": {
        "id": 1004
    }
}
{
    "schema": {
        "type": "struct",
        "fields": [ ... ],
        "optional": true,
        "name": "urn:jsonschema:com:example:domain:Customer"
    },
    "payload": {
        "id": 1004,
        "firstName": "Anne",
        "lastName": "Kretchmar",
        "email": "annek@noanswer.org",
        "active": true,
        "tags" : [ "long-term", "vip" ],
        "birthday" : 5098,
        "category": {
            "id": 100001,
            "name": "Retail"
        },
        "addresses": [
            {
                "id": 16,
                "street": "1289 University Hill Road",
                "city": "Canehill",
                "state": "Arkansas",
                "zip": "72717",
                "type": "LIVING"
            }
        ]
    }
}

To confirm that these are actual typed Kafka Connect records and not just a single JSON string field, you could for instance use the Avro message converter and examine the message schemas in the schema registry.

Sinking Aggregate Messages Into Elasticsearch

The last missing step is to register the Confluent Elasticsearch sink connector, hooking it up with the "customers-complete" topic and letting it push any changes to the corresponding index:

curl -i -X POST \
  -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  http://localhost:8083/connectors/ -d @- <<-EOF
  {
      "name": "es-customers",
      "config": {
          "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
          "tasks.max": "1",
          "topics": "customers-complete",
          "connection.url": "http://elastic:9200",
          "key.ignore": "false",
          "schema.ignore" : "false",
          "behavior.on.null.values" : "delete",
          "type.name": "customer-with-addresses",
          "transforms" : "key",
          "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
          "transforms.key.field": "id"
      }
  }
EOF

This uses Connect’s ExtractField transformation to obtain just the actual id value from the key struct and use it as key for the corresponding Elasticsearch documents. Specifying the "behavior.on.null.values" option will let the connector delete the corresponding document from the index when encountering a tombstone message (i.e. a message with a key but without value).

Finally, we can use the Elasticsearch REST API to browse the index and of course use its powerful full-text query language to find customers by the address or any other property embedded into the aggregate structure:

> curl -X GET -H "Accept:application/json" \
  http://localhost:9200/customers-complete/_search?pretty

  {
      "_shards": {
          "failed": 0,
          "successful": 5,
          "total": 5
      },
      "hits": {
          "hits": [
              {
                  "_id": "1004",
                  "_index": "customers-complete",
                  "_score": 1.0,
                  "_source": {
                      "active": true,
                      "addresses": [
                          {
                              "city": "Canehill",
                              "id": 16,
                              "state": "Arkansas",
                              "street": "1289 University Hill Road",
                              "type": "LIVING",
                              "zip": "72717"
                          }
                      ],
                      "tags" : [ "long-term", "vip" ],
                      "birthday" : 5098,
                      "category": {
                          "id": 100001,
                          "name": "Retail"
                      },
                      "email": "annek@noanswer.org",
                      "firstName": "Anne",
                      "id": 1004,
                      "lastName": "Kretchmar",
                      "scores": [],
                      "someBlob": null,
                      "tags": []
                  },
                  "_type": "customer-with-addresses"
              }
          ],
          "max_score": 1.0,
          "total": 1
      },
      "timed_out": false,
      "took": 11
  }

And there you have it: a customer’s complete data, including their addresses, categories, tags etc., materialized into a single document within Elasticsearch. If you’re using JPA to update the customer, you’ll see the data in the index being updated accordingly in near-realtime.

Pros and Cons

So what are the advantages and disadvantages of this approach for materializing aggregates from multiple source tables compared to the KStreams-based approach?

The big advantage is consistency and awareness of transactional boundaries, whereas the KStreams-based solution in its suggested form was prone to exposing intermediary aggregates. For instance, if you’re storing a customer and three addresses, it might happen that the streaming query first creates an aggregation of the customer and the two addresses inserted first, and shortly thereafter the complete aggregate with all three addresses. This not the case for the approach discussed here, as you’ll only ever stream complete aggregates to Kafka. Also this approach feels a bit more "light-weight", i.e. a simple marker annotation (together with some Jackson annotations for fine-tuning the emitted JSON structures) is enough in order to materialize aggregates from your domain model, whereas some more effort was needed to set up the required streams, temporary tables etc. with the KStreams solution.

The downside of driving aggregations through the application layer is that it’s not fully agnostic to the way you access the primary data. If you bypass the application, e.g. by patching data directly in the database, naturally these updates would be missed, requiring a refresh of affected aggregates. Although this again could be done through change data capture and Debezium: change events to source tables could be captured and consumed by the application itself, allowing it to re-materialize aggregates after external data changes. You also might argue that running JSON serializations within source transactions and storing aggregates within the source database represents some overhead. This often may be acceptable, though.

Another question to ask is what’s the advantage of using change data capture on an intermediary aggregate table over simply posting REST requests to Elasticsearch. The answer is the highly increased robustness and fault tolerance. If the Elasticsearch cluster can’t be accessed for some reason, the machinery of Kafka and Kafka Connect will ensure that any change events will be propagated eventually, once the sink is up again. Also other consumers than Elasticsearch can subscribe to the aggregate topic, the log can be replayed from the beginning etc.

Note that while we’ve been talking primarily about using Elasticsearch as a data sink, there are also other datastores and connectors that support complexly structured records. One example would be MongoDB and the sink connector maintained by Hans-Peter Grahsl, which one could use to sink customer aggregates into MongoDB, for instance enabling efficient retrieval of a customer and all their associated data with a single primary key look-up.

Outlook

The Hibernate ORM extension as well as the SMT discussed in this post can be found in our examples repository. They should be considered to be at "proof-of-concept" level currently.

That being said, we’re considering to make this a Debezium component proper, allowing you to employ this aggregation approach within your Hibernate-based applications just by pulling in this new component. For that we’d have to improve a few things first, though. Most importantly, an API is needed which will let you (re-)create aggregates on demand, e.g. for existing data or for data updated by bulk updates via the Criteria API / JPQL (which will be missed by listeners). Also aggregates should be re-created automatically, if any of the referenced entities change (with the current PoC, only a change to the customer instance itself will trigger its aggregate view to be rebuilt, but not a change to one of its addresses).

If you like this idea, then let us know about it, so we can gauge the general interest in this. Also, this would be a great item to work on, if you’re interested in contributing to the Debezium project. Looking forward to hearing from you, e.g. in the comment section below or on our mailing list.

Thanks a lot to Hans-Peter Grahsl for his feedback on an earlier version of this post!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium 0.8.3.Final Released

As temperatures are cooling off, the Debezium team is getting into full swing again and we’re happy to announce the release of Debezium 0.8.3.Final!

This is a bugfix release to the current stable release line of Debezium, 0.8.x, while the work on Debezium 0.9 goes on in parallel. There are 14 fixes in this release. As in earlier 0.8.x releases, we’ve further improved the new Antlr-based DDL parser used by the MySQL connector (see DBZ-901, DBZ-903 and DBZ-910).

The Postgres connector saw a huge improvement to its start-up time for databases with lots of custom types (DBZ-899). The user reporting this issue had nearly 200K entries in pg_catalog.pg_type, and due to an N + 1 SELECT issue within the Postgres driver itself, this caused the connector to take 24 minutes to start. By using a custom query for obtaining the type metadata, we were able to cut down this time to 5 seconds! Right now we’re working with the maintainers of the Postgres driver to get this issue fixed upstream, too.

More Flexible Propagation of DELETEs

Besides those bug fixes we decided to also merge one new feature from the 0.9.x branch into the 0.8.3.Final release, which those of you may find useful who are using the SMT for extracting the "after" state from change events (DBZ-857).

This SMT can be employed to stream changes to sink connectors which expect just a "flat" row representation of data instead of Debezium’s complex event structure. Not all sink connectors support the handling of deletions, though. E.g. some connectors will fail when encountering tombstone events. Therefore the SMT can now optionally rewrite delete events into updates of a special "deleted" marker field.

For that, set the delete.handling.mode option of the SMT to "rewrite":

...
"transforms" : "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.delete.handling.mode" : "rewrite",
...

When a DELETE event is propagated, the "__deleted" field of outgoing records will be set to true. So when for instance consuming the events with the JDBC sink connector, you’d see this being reflected in a corresponding column in the sink tables:

__deleted | last_name |  id  | first_name |         email
-----------+-----------+------+------------+-----------------------
false     | Thomas    | 1001 | Sally      | sally.thomas@acme.com
false     | Bailey    | 1002 | George     | gbailey@foobar.com
false     | Kretchmar | 1004 | Anne       | annek@noanswer.org
true      | Walker    | 1003 | Edward     | ed@walker.com

You then for instance can use a batch job running on your sink to remove all records flagged as deleted.

What’s next?

We’re continuing the work on Debezium 0.9, which will mostly be about improvements to the SQL Server and Oracle connectors. The current plan is to do the next 0.9 release (either Alpha2 or Beta1) in two weeks from now.

Also it’s the beginning of the conference season, so we’ll spend some time with preparing demos and presenting Debezium at multiple locations. There will be sessions on change data capture with Debezium a these conferences:

If you are at any of these conferences, come and say Hi; we’d love to exchange with you about your use cases, feature requests, feedback on our roadmap and any other ideas around Debezium.

Finally, a big "Thank You" goes to our fantastic community members Andrey Pustovetov, Maciej Bryński and Peng Lyu for their contributions to this release!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


back to top