Subscribe


Reliable Microservices Data Exchange With the Outbox Pattern

As part of their business logic, microservices often do not only have to update their own local data store, but they also need to notify other services about data changes that happened. The outbox pattern describes an approach for letting services execute these two tasks in a safe and consistent manner; it provides source services with instant "read your own writes" semantics, while offering reliable, eventually consistent data exchange across service boundaries.

If you’ve built a couple of microservices, you’ll probably agree that the hardest part about them is data: microservices don’t exist in isolation and very often they need to propagate data and data changes amongst each other.

For instance consider a microservice that manages purchase orders: when a new order is placed, information about that order may have to be relayed to a shipment service (so it can assemble shipments of one or more orders) and a customer service (so it can update things like the customer’s total credit balance based on the new order).

There are different approaches for letting the order service know the other two about new purchase orders; e.g. it could invoke some REST, grpc or other (synchronous) API provided by these services. This might create some undesired coupling, though: the sending service must know which other services to invoke and where to find them. It also must be prepared for these services temporarily not being available. Service meshes such as Istio can come in helpful here, by providing capabilities like request routing, retries, circuit breakers and much more.

The general issue of any synchronous approach is that one service cannot really function without the other services which it invokes. While buffering and retrying might help in cases where other services only need to be notified of certain events, this is not the case if a service actually needs to query other services for information. For instance, when a purchase order is placed, the order service might need to obtain the information how many times the purchased item is on stock from an inventory service.

Another downside of such a synchronous approach is that it lacks re-playability, i.e. the possibility for new consumers to arrive after events have been sent and still be able to consume the entire event stream from the beginning.

Both problems can be addressed by using an asynchronous data exchange approach instead: i.e having the order, inventory and other services propagate events through a durable message log such as Apache Kafka. By subscribing to these event streams, each service will be notified about the data change of other services. It can react to these events and, if needed, create a local representation of that data in its own data store, using a representation tailored towards its own needs. For instance, such view might be denormalized to efficiently support specific access patterns, or it may only contain a subset of the original data that’s relevant to the consuming service.

Durable logs also support re-playability, i.e. new consumers can be added as needed, enabling use cases you might not have had in mind originally, and without touching the source service. E.g. consider a data warehouse which should keep information about all the orders ever placed, or some full-text search functionality on purchase orders based on Elasticsearch. Once the purchase order events are in a Kafka topic (Kafka’s topic’s retention policy settings can be used to ensure that events remain in a topic as long as its needed for the given use cases and business requirements), new consumers can subscribe, process the topic from the very beginning and materialize a view of all the data in a microservice’s database, search index, data warehouse etc.

Dealing with Topic Growth

Depending on the amount of data (number and size of records, frequency of changes), it may or may not be feasible to keep events in topics for a long or even indefinite time. Very often, some or even all events pertaining to a given data item (e.g. a specific purchase order) might be eligible for deletion from a business point of view after a given point in time. See the box "Deletion of Events from Kafka Topics" further below for some more thoughts on the deletion of events from Kafka topics in order to keep their size within bounds.

The Issue of Dual Writes

In order to provide their functionality, microservices will typically have their own local data store. For instance, the order service may use a relational database to persist the information about purchase orders. When a new order is placed, this may result in an INSERT operation in a table PurchaseOrder in the service’s database. At the same time, the service may wish to send an event about the new order to Apache Kafka, so to propagate that information to other interested services.

Simply issuing these two requests may lead to potential inconsistencies, though. The reason being that we cannot have one shared transaction that would span the service’s database as well as Apache Kafka, as the latter doesn’t support to be enlisted in distributed (XA) transactions. So in unfortunate circumstances it might happen that we end up with having the new purchase order persisted in the local database, but not having sent the corresponding message to Kafka (e.g. due to some networking issue). Or, the other way around, we might have sent the message to Kafka but failed to persist the purchase order in the local database. Both situations are undesirable; this may cause no shipment to be created for a seemingly successfully placed order. Or a shipment gets created, but then there’d be no trace about the corresponding purchase order in the order service itself.

So how can this situation be avoided? The answer is to only modify one of the two resources (the database or Apache Kafka) and drive the update of the second one based on that, in an eventually consistent manner. Let’s first consider the case of only writing to Apache Kafka.

When receiving a new purchase order, the order service would not do the INSERT into its database synchronously; instead, it would only send an event describing the new order to a Kafka topic. So only one resource gets modified at a time, and if something goes wrong with that, we’ll find out about it instantly and report back to the caller of the order service that the request failed.

At the same time, the service itself would subscribe to that Kafka topic. That way, it will be notified when a new message arrives in the topic and it can persist the new purchase order in its database. There’s one subtle challenge here, though, and that is the lack of "read your own write" semantics. E.g. let’s assume the order service also has an API for searching for all the purchase orders of a given customer. When invoking that API right after placing a new order, due to the asynchronous nature of processing messages from the Kafka topic, it might happen that the purchase order has not yet been persisted in the service’s database and thus will not be returned by that query. That can lead to a very confusing user experience, as users for instance may miss newly placed orders in their shopping history. There are ways to deal with this situation, e.g. the service could keep newly placed purchase orders in memory and answer subsequent queries based on that. This gets quickly non-trivial though when implementing more complex queries or considering that the order service might also comprise multiple nodes in a clustered set-up, which would require propagation of that data within the cluster.

Now how would things look like when only writing to the database synchronously and driving the export of a message to Apache Kafka based on that? This is where the outbox pattern comes in.

The Outbox Pattern

The idea of this approach is to have an "outbox" table in the service’s database. When receiving a request for placing a purchase order, not only an INSERT into the PurchaseOrder table is done, but, as part of the same transaction, also a record representing the event to be sent is inserted into that outbox table.

The record describes an event that happened in the service, for instance it could be a JSON structure representing the fact that a new purchase order has been placed, comprising data on the order itself, its order lines as well as contextual information such as a use case identifier. By explicitly emitting events via records in the outbox table, it can be ensured that events are structured in a way suitable for external consumers. This also helps to make sure that event consumers won’t break when for instance altering the internal domain model or the PurchaseOrder table.

An asynchronous process monitors that table for new entries. If there are any, it propagates the events as messages to Apache Kafka. This gives us a very nice balance of characteristics: By synchronously writing to the PurchaseOrder table, the source service benefits from "read your own writes" semantics. A subsequent query for purchase orders will return the newly persisted order, as soon as that first transaction has been committed. At the same time, we get reliable, asynchronous, eventually consistent data propagation to other services via Apache Kafka.

Now, the outbox pattern isn’t actually a new idea. It has been in use for quite some time. In fact, even when using JMS-style message brokers, which actually could participate in distributed transactions, it can be a preferable option to avoid any coupling and potential impact by downtimes of remote resources such as a message broker. You can also find a description of the pattern on Chris Richardson’s excellent microservices.io site.

Nevertheless, the pattern gets much less attention than it deserves and it is especially useful in the context of microservices. As we’ll see, the outbox pattern can be implemented in a very elegant and efficient way using change data capture and Debezium. In the following, let’s explore how.

An Implementation Based on Change Data Capture

Log-based Change Data Capture (CDC) is a great fit for capturing new entries in the outbox table and stream them to Apache Kafka. As opposed to any polling-based approach, event capture happens with a very low overhead in near-realtime. Debezium comes with CDC connectors for several databases such as MySQL, Postgres and SQL Server. The following example will use the Debezium connector for Postgres.

You can find the complete source code of the example on GitHub. Refer to the README.md for details on building and running the example code. The example is centered around two microservices, order-service and shipment-service. Both are implemented in Java, using CDI as the component model and JPA/Hibernate for accessing their respective databases. The order service runs on WildFly and exposes a simple REST API for placing purchase orders and canceling specific order lines. It uses a Postgres database as its local data store. The shipment service is based on Thorntail; via Apache Kafka, it receives events exported by the order service and creates corresponding shipment entries in its own MySQL database. Debezium tails the transaction log ("write-ahead log", WAL) of the order service’s Postgres database in order to capture any new events in the outbox table and propagates them to Apache Kafka.

The overall architecture of the solution can be seen in the following picture:

Outbox Pattern Overview

Note that the pattern is in no way tied to these specific implementation choices. It could equally well be realized using alternative technologies such as Spring Boot (e.g. leveraging Spring Data’s support for domain events), plain JDBC or other programming languages than Java altogether.

Now let’s take a closer look at some of the relevant components of the solution.

The Outbox Table

The outbox table resides in the database of the order service and has the following structure:

Column        |          Type          | Modifiers
--------------+------------------------+-----------
id            | uuid                   | not null
aggregatetype | character varying(255) | not null
aggregateid   | character varying(255) | not null
type          | character varying(255) | not null
payload       | jsonb                  | not null

Its columns are these:

  • id: unique id of each message; can be used by consumers to detect any duplicate events, e.g. when restarting to read messages after a failure. Generated when creating a new event.

  • aggregatetype: the type of the aggregate root to which a given event is related; the idea being, leaning on the same concept of domain-driven design, that exported events should refer to an aggregate ("a cluster of domain objects that can be treated as a single unit"), where the aggregate root provides the sole entry point for accessing any of the entities within the aggregate. This could for instance be "purchase order" or "customer".

    This value will be used to route events to corresponding topics in Kafka, so there’d be a topic for all events related to purchase orders, one topic for all customer-related events etc. Note that also events pertaining to a child entity contained within one such aggregate should use that same type. So e.g. an event representing the cancelation of an individual order line (which is part of the purchase order aggregate) should also use the type of its aggregate root, "order", ensuring that also this event will go into the "order" Kafka topic.

  • aggregateid: the id of the aggregate root that is affected by a given event; this could for instance be the id of a purchase order or a customer id; Similar to the aggregate type, events pertaining to a sub-entity contained within an aggregate should use the id of the containing aggregate root, e.g. the purchase order id for an order line cancelation event. This id will be used as the key for Kafka messages later on. That way, all events pertaining to one aggregate root or any of its contained sub-entities will go into the same partition of that Kafka topic, which ensures that consumers of that topic will consume all the events related to one and the same aggregate in the exact order as they were produced.

  • type: the type of event, e.g. "Order Created" or "Order Line Canceled". Allows consumers to trigger suitable event handlers.

  • payload: a JSON structure with the actual event contents, e.g. containing a purchase order, information about the purchaser, contained order lines, their price etc.

Sending Events to the Outbox

In order to "send" events to the outbox, code in the order service could in general just do an INSERT into the outbox table. However, it’s a good idea to go for a slightly more abstract API, allowing to adjust implementation details of the outbox later on more easily, if needed. CDI events come in very handy for this. They can be raised in the application code and will be processed synchronously by the outbox event sender, which will do the required INSERT into the outbox table.

All outbox event types should implement the following contract, resembling the structure of the outbox table shown before:

public interface ExportedEvent {

    String getAggregateId();
    String getAggregateType();
    JsonNode getPayload();
    String getType();
}

To produce such event, application code uses an injected Event instance, as e.g. here in the OrderService class:

@ApplicationScoped
public class OrderService {

    @PersistenceContext
    private EntityManager entityManager;

    @Inject
    private Event<ExportedEvent> event;

    @Transactional
    public PurchaseOrder addOrder(PurchaseOrder order) {
        order = entityManager.merge(order);

        event.fire(OrderCreatedEvent.of(order));
        event.fire(InvoiceCreatedEvent.of(order));

        return order;
    }

    @Transactional
    public PurchaseOrder updateOrderLine(long orderId, long orderLineId,
            OrderLineStatus newStatus) {
        // ...
    }
}

In the addOrder() method, the JPA entity manager is used to persist the incoming order in the database and the injected event is used to fire a corresponding OrderCreatedEvent and an InvoiceCreatedEvent. Again, keep in mind that, despite the notion of "event", these two things happen within one and the same transaction. i.e. within this transaction, three records will be inserted into the database: one in the table with purchase orders and two in the outbox table.

Actual event implementations are straight-forward; as an example, here’s the OrderCreatedEvent class:

public class OrderCreatedEvent implements ExportedEvent {

    private static ObjectMapper mapper = new ObjectMapper();

    private final long id;
    private final JsonNode order;

    private OrderCreatedEvent(long id, JsonNode order) {
        this.id = id;
        this.order = order;
    }

    public static OrderCreatedEvent of(PurchaseOrder order) {
        ObjectNode asJson = mapper.createObjectNode()
                .put("id", order.getId())
                .put("customerId", order.getCustomerId())
                .put("orderDate", order.getOrderDate().toString());

        ArrayNode items = asJson.putArray("lineItems");

        for (OrderLine orderLine : order.getLineItems()) {
        items.add(
                mapper.createObjectNode()
                .put("id", orderLine.getId())
                .put("item", orderLine.getItem())
                .put("quantity", orderLine.getQuantity())
                .put("totalPrice", orderLine.getTotalPrice())
                .put("status", orderLine.getStatus().name())
            );
        }

        return new OrderCreatedEvent(order.getId(), asJson);
    }

    @Override
    public String getAggregateId() {
        return String.valueOf(id);
    }

    @Override
    public String getAggregateType() {
        return "Order";
    }

    @Override
    public String getType() {
        return "OrderCreated";
    }

    @Override
    public JsonNode getPayload() {
        return order;
    }
}

Note how Jackson’s ObjectMapper is used to create a JSON representation of the event’s payload.

Now let’s take a look at the code that consumes any fired ExportedEvent and does the corresponding write to the outbox table:

@ApplicationScoped
public class EventSender {

    @PersistenceContext
    private EntityManager entityManager;

    public void onExportedEvent(@Observes ExportedEvent event) {
        OutboxEvent outboxEvent = new OutboxEvent(
                event.getAggregateType(),
                event.getAggregateId(),
                event.getType(),
                event.getPayload()
        );

        entityManager.persist(outboxEvent);
        entityManager.remove(outboxEvent);
    }
}

It’s rather simple: for each event the CDI runtime will invoke the onExportedEvent() method. An instance of the OutboxEvent entity is persisted in the database — and removed right away!

This might be surprising at first. But it makes sense when remembering how log-based CDC works: it doesn’t examine the actual contents of the table in the database, but instead it tails the append-only transaction log. The calls to persist() and remove() will create an INSERT and a DELETE entry in the log once the transaction commits. After that, Debezium will process these events: for any INSERT, a message with the event’s payload will be sent to Apache Kafka. DELETE events on the other hand can be ignored, as the removal from the outbox table is a mere technicality that doesn’t require any propagation to the message broker. So we are able to capture the event added to the outbox table by means of CDC, but when looking at the contents of the table itself, it will always be empty. This means that no additional disk space is needed for the table (apart from the log file elements which will automatically be discarded at some point) and also no separate house-keeping process is required to stop it from growing indefinitely.

Registering the Debezium Connector

With the outbox implementation in place, it’s time to register the Debezium Postgres connector, so it can capture any new events in the outbox table and relay them to Apache Kafka. That can be done by POST-ing the following JSON request to the REST API of Kafka Connect:

{
    "name": "outbox-connector",
    "config": {
        "connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max" : "1",
        "database.hostname" : "order-db",
        "database.port" : "5432",
        "database.user" : "postgresuser",
        "database.password" : "postgrespw",
        "database.dbname" : "orderdb",
        "database.server.name" : "dbserver1",
        "schema.whitelist" : "inventory",
        "table.whitelist" : "inventory.outboxevent",
        "tombstones.on.delete" : "false",
        "transforms" : "router",
        "transforms.router.type" : "io.debezium.examples.outbox.routingsmt.EventRouter"
    }
}

This sets up an instance of io.debezium.connector.postgresql.PostgresConnector, capturing changes from the specified Postgres instance. Note that by means of a table whitelist, only changes from the outboxevent table are captured. It also applies a single message transform (SMT) named EventRouter.

Deletion of Events from Kafka Topics

By setting the tombstones.on.delete to false, no deletion markers ("tombstones") will be emitted by the connector when an event record gets deleted from the outbox table. That makes sense, as the deletion from the outbox table shouldn’t affect the retention of events in the corresponding Kafka topics. Instead, a specific retention time for the event topics may be configured in Kafka, e.g. to retain all purchase order events for 30 days.

Alternatively, one could work with compacted topics. This would require some changes to the design of events in the outbox table:

  • they must describe the entire aggregate; so for instance also an event representing the cancelation of a single order line should describe the complete current state of the containing purchase order; that way consumers will be able to obtain the entire state of the purchase order also when only seeing the last event pertaining to a given order, after log compaction ran.

  • they must have one more boolean attribute indicating whether a particular event represents the deletion of the event’s aggregate root. Such an event (e.g. of type OrderDeleted) could then be used by the event routing SMT described in the next section to produce a deletion marker for that aggregate root. Log compaction would then remove all events pertaining to the given purchase order when its OrderDeleted event has been written to the topic.

Naturally, when deleting events, the event stream will not be re-playable from its very beginning any longer. Depending on the specific business requirements, it might be sufficient to just keep the final state of a given purchase order, customer etc. This could be achieved using compacted topics and a sufficiently value for the topic’s delete.retention.ms setting. Another option could be to move historic events to some sort of cold storage (e.g. an Amazon S3 bucket), from where they can be retrieved if needed, followed by reading the latest events from the Kafka topics. Which approach to follow depends on the specific requirements, expected amount of data and expertise in the team developing and operating the solution.

Topic Routing

By default, the Debezium connectors will send all change events originating from one given table to the same topic, i.e. we’d end up with a single Kafka topic named dbserver1.inventory.outboxevent which would contain all events, be it order events, customer events etc.

To simplify the implementation of consumers which are only interested in specific event types it makes more sense, though, to have multiple topics, e.g. OrderEvents, CustomerEvents and so on. For instance the shipment service might not be interested in any customer events. By only subscribing to the OrderEvents topic, it will be sure to never receive any customer events.

In order to route the change events captured from the outbox table to different topics, that custom SMT EventRouter is used. Here is the code of its apply() method, which will be invoked by Kafka Connect for each record emitted by the Debezium connector:

@Override
public R apply(R record) {
    // Ignoring tombstones just in case
    if (record.value() == null) {
        return record;
    }

    Struct struct = (Struct) record.value();
    String op = struct.getString("op");

    // ignoring deletions in the outbox table
    if (op.equals("d")) {
        return null;
    }
    else if (op.equals("c")) {
        Long timestamp = struct.getInt64("ts_ms");
        Struct after = struct.getStruct("after");

        String key = after.getString("aggregateid");
        String topic = after.getString("aggregatetype") + "Events";

        String eventId = after.getString("id");
        String eventType = after.getString("type");
        String payload = after.getString("payload");

        Schema valueSchema = SchemaBuilder.struct()
            .field("eventType", after.schema().field("type").schema())
            .field("ts_ms", struct.schema().field("ts_ms").schema())
            .field("payload", after.schema().field("payload").schema())
            .build();

        Struct value = new Struct(valueSchema)
            .put("eventType", eventType)
            .put("ts_ms", timestamp)
            .put("payload", payload);

        Headers headers = record.headers();
        headers.addString("eventId", eventId);

        return record.newRecord(topic, null, Schema.STRING_SCHEMA, key, valueSchema, value,
                record.timestamp(), headers);
    }
    // not expecting update events, as the outbox table is "append only",
    // i.e. event records will never be updated
    else {
        throw new IllegalArgumentException("Record of unexpected op type: " + record);
    }
}

When receiving a delete event (op = d), it will discard that event, as that deletion of event records from the outbox table is not relevant to downstream consumers. Things get more interesting, when receiving a create event (op = c). Such record will be propagated to Apache Kafka.

Debezium’s change events have a complex structure, that contain the old (before) and new (after) state of the represented row. The event structure to propagate is obtained from the after state. The aggregatetype value from the captured event record is used to build the name of the topic to send the event to. For instance, events with aggregatetype set to Order will be sent to the OrderEvents topic. aggregateid is used as the message key, making sure all messages of that aggregate will go into the same partition of that topic. The message value is a structure comprising the original event payload (encoded as JSON), the timestamp indicating when the event was produced and the event type. Finally, the event UUID is propagated as a Kafka header field. This allows for efficient duplicate detection by consumers, without having to examine the actual message contents.

Events in Apache Kafka

Now let’s take a look into the OrderEvents and CustomerEvents topics.

If you have checked out the example sources and started all the components via Docker Compose (see the README.md file in the example project for more details), you can place purchase orders via the order service’s REST API like so:

cat resources/data/create-order-request.json | http POST http://localhost:8080/order-service/rest/orders

Similarly, specific order lines can be canceled:

cat resources/data/cancel-order-line-request.json | http PUT http://localhost:8080/order-service/rest/orders/1/lines/2

When using a tool such as the very practical kafkacat utility, you should now see messages like these in the OrderEvents topic:

kafkacat -b kafka:9092 -C -o beginning -f 'Headers: %h\nKey: %k\nValue: %s\n' -q -t OrderEvents
Headers: eventId=d03dfb18-8af8-464d-890b-09eb8b2dbbdd
Key: "4"
Value: {"eventType":"OrderCreated","ts_ms":1550307598558,"payload":"{\"id\": 4, \"lineItems\": [{\"id\": 7, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 8, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"}
Headers: eventId=49f89ea0-b344-421f-b66f-c635d212f72c
Key: "4"
Value: {"eventType":"OrderLineUpdated","ts_ms":1550308226963,"payload":"{\"orderId\": 4, \"newStatus\": \"CANCELLED\", \"oldStatus\": \"ENTERED\", \"orderLineId\": 7}"}

The payload field with the message values is the string-ified JSON representation of the original events. The Debezium Postgres connector emits JSONB columns as a string (using the io.debezium.data.Json logical type name), which is why the quotes are escaped. The jq utility, and more specifically, its fromjson operator, come in handy for displaying the event payload in a more readable way:

kafkacat -b kafka:9092 -C -o beginning -t Order | jq '.payload | fromjson'
{
  "id": 4,
  "lineItems": [
    {
      "id": 7,
      "item": "Debezium in Action",
      "status": "ENTERED",
      "quantity": 2,
      "totalPrice": 39.98
    },
    {
      "id": 8,
      "item": "Debezium for Dummies",
      "status": "ENTERED",
      "quantity": 1,
      "totalPrice": 29.99
    }
  ],
  "orderDate": "2019-01-31T12:13:01",
  "customerId": 123
}
{
  "orderId": 4,
  "newStatus": "CANCELLED",
  "oldStatus": "ENTERED",
  "orderLineId": 7
}

You can also take a look at the CustomerEvents topic to inspect the events representing the creation of an invoice when a purchase order is added.

Duplicate Detection in the Consuming Service

At this point, our implementation of the outbox pattern is fully functional; when the order service receives a request to place an order (or cancel an order line), it will persist the corresponding state in the purchaseorder and orderline tables of its database. At the same time, within the same transaction, corresponding event entries will be added to the outbox table in the same database. The Debezium Postgres connector captures any insertions into that table and routes the events into the Kafka topic corresponding to the aggregate type represented by a given event.

To wrap things up, let’s explore how another microservice such as the shipment service can consume these messages. The entry point into that service is a regular Kafka consumer implementation, which is not too exciting and hence omitted here for the sake of brevity. You can find its source code in the example repository. For each incoming message on the Order topic, the consumer calls the OrderEventHandler:

@ApplicationScoped
public class OrderEventHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderEventHandler.class);

    @Inject
    private MessageLog log;

    @Inject
    private ShipmentService shipmentService;

    @Transactional
    public void onOrderEvent(UUID eventId, String key, String event) {
        if (log.alreadyProcessed(eventId)) {
            LOGGER.info("Event with UUID {} was already retrieved, ignoring it", eventId);
            return;
        }

        JsonObject json = Json.createReader(new StringReader(event)).readObject();
        JsonObject payload = json.containsKey("schema") ? json.getJsonObject("payload") :json;

        String eventType = payload.getString("eventType");
        Long ts = payload.getJsonNumber("ts_ms").longValue();
        String eventPayload = payload.getString("payload");

        JsonReader payloadReader = Json.createReader(new StringReader(eventPayload));
        JsonObject payloadObject = payloadReader.readObject();

        if (eventType.equals("OrderCreated")) {
            shipmentService.orderCreated(payloadObject);
        }
        else if (eventType.equals("OrderLineUpdated")) {
            shipmentService.orderLineUpdated(payloadObject);
        }
        else {
            LOGGER.warn("Unkown event type");
        }

        log.processed(eventId);
    }
}

The first thing done by onOrderEvent() is to check whether the event with the given UUID has been processed before. If so, any further calls for that same event will be ignored. This is to prevent any duplicate processing of events caused by the "at least once" semantics of this data pipeline. For instance it could happen that the Debezium connector or the consuming service fail before acknowledging the retrieval of a specific event with the source database or the messaging broker, respectively. In that case, after a restart of Debezium or the consuming service, a few events may be processed a second time. Propagating the event UUID as a Kafka message header allows for an efficient detection and exclusion of duplicates in the consumer.

If a message is received for the first time, the message value is parsed and the business method of the ShippingService method corresponding to the specific event type is invoked with the event payload. Finally, the message is marked as processed with the message log.

This MessageLog simply keeps track of all consumed events in a table within the service’s local database:

@ApplicationScoped
public class MessageLog {

    @PersistenceContext
    private EntityManager entityManager;

    @Transactional(value=TxType.MANDATORY)
    public void processed(UUID eventId) {
        entityManager.persist(new ConsumedMessage(eventId, Instant.now()));
    }

    @Transactional(value=TxType.MANDATORY)
    public boolean alreadyProcessed(UUID eventId) {
        return entityManager.find(ConsumedMessage.class, eventId) != null;
    }
}

That way, should the transaction be rolled back for some reason, also the original message will not be marked as processed and an exception would bubble up to the Kafka event consumer loop. This allows for re-trying to process the message later on.

Note that a more complete implementation should take care of re-trying given messages only for a certain number of times, before re-routing any unprocessable messages to a dead-letter queue or similar. Also there should be some house-keeping on the message log table; periodically, all events older than the consumer’s current offset committed with the broker may be deleted, as it’s ensured that such messages won’t be propagated to the consumer another time.

Summary

The outbox pattern is a great way for propagating data amongst different microservices.

By only modifying a single resource - the source service’s own database - it avoids any potential inconsistencies of altering multiple resources at the same time which don’t share one common transactional context (the database and Apache Kafka). By writing to the database first, the source service has instant "read your own writes" semantics, which is important for a consistent user experience, allowing query methods invoked following to a write to instantly reflect any data changes.

At the same time, the pattern enables asynchronous event propagation to other microservices. Apache Kafka acts as a highly scalable and reliable backbone for the messaging amongst the services. Given the right topic retention settings, new consumers may come up long after an event has been originally produced, and build up their own local state based on the event history.

Putting Apache Kafka into the center of the overall architecture also ensures a decoupling of involved services. If for instance single components of the solution fail or are not available for some time, e.g. during an update, events will simply be processed later on: after a restart, the Debezium connector will continue to tail the outbox table from the point where it left off before. Similarly, any consumer will continue to process topics from its previous offset. By keeping track of already successfully processed messages, duplicates can be detected and excluded from repeated handling.

Naturally, such event pipeline between different services is eventually consistent, i.e. consumers such as the shipping service may lag a bit behind producers such as the order service. Usually, that’s just fine, though, and can be handled in terms of the application’s business logic. For instance there’ll typically be no need to create a shipment within the very same second as an order has been placed. Also, end-to-end delays of the overall solution are typically low (seconds or even sub-second range), thanks to log-based change data capture which allows for emission of events in near-realtime.

One last thing to keep in mind is that the structure of the events exposed via the outbox should be considered a part of the emitting service’s API. I.e. when needed, their structure should be adjusted carefully and with compatibility considerations in mind. This is to ensure to not accidentally break any consumers when upgrading the producing service. At the same time, consumers should be lenient when handling messages and for instance not fail when encountering unknown attributes within received events.

Many thanks to Hans-Peter Grahsl, Jiri Pechanec, Justin Holmes and René Kerner for their feedback while writing this post!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Automating Cache Invalidation With Change Data Capture

The second-level cache of Hibernate ORM / JPA is a proven and efficient way to increase application performance: caching read-only or rarely modified entities avoids roundtrips to the database, resulting in improved response times of the application.

Unlike the first-level cache, the second-level cache is associated with the session factory (or entity manager factory in JPA terms), so its contents are shared across transactions and concurrent sessions. Naturally, if a cached entity gets modified, the corresponding cache entry must be updated (or purged from the cache), too. As long as the data changes are done through Hibernate ORM, this is nothing to worry about: the ORM will update the cache automatically.

Things get tricky, though, when bypassing the application, e.g. when modifying records directly in the database. Hibernate ORM then has no way of knowing that the cached data has become stale, and it’s necessary to invalidate the affected items explicitly. A common way for doing so is to foresee some admin functionality that allows to clear an application’s caches. For this to work, it’s vital to not forget about calling that invalidation functionality, or the application will keep working with outdated cached data.

In the following we’re going to explore an alternative approach for cache invalidation, which works in a reliable and fully automated way: by employing Debezium and its change data capture (CDC) capabilities, you can track data changes in the database itself and react to any applied change. This allows to invalidate affected cache entries in near-realtime, without the risk of stale data due to missed changes. If an entry has been evicted from the cache, Hibernate ORM will load the latest version of the entity from the database the next time is requested.

The Example Application

As an example, consider this simple model of two entities, PurchaseOrder and Item:

Example domain model

A purchase order represents the order of an item, where its total price is the ordered quantity times the item’s base price.

Source Code

The source code of this example is provided on GitHub. If you want to follow along and try out all the steps described in the following, clone the repo and follow the instructions in README.md for building the project.

Modelling order and item as JPA entities is straight-forward:

@Entity
public class PurchaseOrder {

    @Id
    @GeneratedValue(generator = "sequence")
    @SequenceGenerator(
        name = "sequence", sequenceName = "seq_po", initialValue = 1001, allocationSize = 50
    )
    private long id;
    private String customer;
    @ManyToOne private Item item;
    private int quantity;
    private BigDecimal totalPrice;

    // ...
}

As changes to items are rare, the Item entity should be cached. This can be done by simply specifying JPA’s @Cacheable annotation:

@Entity
@Cacheable
public class Item {

    @Id
    private long id;
    private String description;
    private BigDecimal price;

    // ...
}

You also need to enable the second-level cache in the META-INF/persistence.xml file. The property hibernate.cache.use_second_level_cache activates the cache itself, and the ENABLE_SELECTIVE cache mode causes only those entities to be put into the cache which are annotated with @Cacheable. It’s also a good idea to enable SQL query logging and cache access statistics. That way you’ll be able to verify whether things work as expected by examining the application log:

<?xml version="1.0" encoding="utf-8"?>
<persistence xmlns="http://xmlns.jcp.org/xml/ns/persistence"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="..."
    version="2.2">

    <persistence-unit name="orders-PU-JTA" transaction-type="JTA">
        <jta-data-source>java:jboss/datasources/OrderDS</jta-data-source>
        <shared-cache-mode>ENABLE_SELECTIVE</shared-cache-mode>
        <properties>
            <property name="hibernate.cache.use_second_level_cache" value="true" />

            <property name="hibernate.show_sql" value="true" />
            <property name="hibernate.format_sql" value="true" />
            <property name="hibernate.generate_statistics" value="true" />

            <!-- dialect etc. ... -->
        </properties>
    </persistence-unit>
</persistence>

When running on a Java EE application server (or Jakarta EE how the stack is called after it has been donated to the Eclipse Foundation), that’s all you need to enable second-level caching. In the case of WildFly (which is what’s used in the example project), the Infinispan key/value store is used as the cache provider by default.

Now try and see what happens when modifying an item’s price by running some SQL in the database, bypassing the application layer. If you’ve checked out the example source code, comment out the DatabaseChangeEventListener class and start the application as described in the README.md. You then can place purchase orders using curl like this (a couple of example items have been persisted at application start-up):

> curl -H "Content-Type: application/json" \
  -X POST \
  --data '{ "customer" : "Billy-Bob", "itemId" : 10003, "quantity" : 2 }' \
  http://localhost:8080/cache-invalidation/rest/orders
{
    "id" : 1002,
    "customer" : "Billy-Bob",
    "item" : {
        "id" :10003,
        "description" : "North By Northwest",
        "price" : 14.99
    },
    "quantity" : 2,
    "totalPrice" : 29.98
}

The response is the expected one, as the item price is 14.99. Now update the item’s price directly in the database. The example uses Postgres, so you can use the psql CLI utility to do so:

docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "UPDATE item SET price = 20.99 where id = 10003"'

Placing another purchase order for the same item using curl, you’ll see that the calculated total price doesn’t reflect the update. Not good! But it’s not too surprising, given that the price update was applied completely bypassing the application layer and Hibernate ORM.

The Change Event Handler

Now let’s explore how to use Debezium and CDC to react to changes in the item table and invalidate corresponding cache entries.

While Debezium most of the times is deployed into Kafka Connect (thus streaming change events into Apache Kafka topics), it has another mode of operation that comes in very handy for the use case at hand. Using the embedded engine, you can run the Debezium connectors as a library directly within your application. For each change event received from the database, a configured callback method will be invoked, which in the case at hand will evict the affected item from the second-level cache.

The following picture shows the design of this approach:

Architecture Overview

While this doesn’t come with the scalability and fault tolerance provided by Apache Kafka, it nicely fits the given requirements. As the second-level cache is bound to the application lifecycle, there is for instance no need for the offset management and restarting capabilities provided by the Kafka Connect framework. For the given use case it is enough to receive data change events while the application is running, and using the embedded engine enables exactly that.

Clustered Applications

Note that it still might make sense to use Apache Kafka and the regular deployment of Debezium into Kafka Connect when running a clustered application where each node has a local cache. Instead of registering a connector on each node, Kafka and Connect would allow you to deploy a single connector instance and have the application nodes listen to the topic(s) with the change events. This would result in less resource utilization in the database.

Having added the dependencies of the Debezium embedded engine (io.debezium:debezium-embedded:0.9.0.Beta1) and the Debezium Postgres connector (io.debezium:debezium-connector-postgres:0.9.0.Beta1) to your project, a class DatabaseChangeEventListener for listening to any changes in the database can be implemented like this:

@ApplicationScoped
public class DatabaseChangeEventListener {

    @Resource
    private ManagedExecutorService executorService;

    @PersistenceUnit private EntityManagerFactory emf;

    @PersistenceContext
    private EntityManager em;

    private EmbeddedEngine engine;

    public void startEmbeddedEngine(@Observes @Initialized(ApplicationScoped.class) Object init) {
        Configuration config = Configuration.empty()
                .withSystemProperties(Function.identity()).edit()
                .with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class)
                .with(EmbeddedEngine.ENGINE_NAME, "cache-invalidation-engine")
                .with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class)
                .with("name", "cache-invalidation-connector")
                .with("database.hostname", "postgres")
                .with("database.port", 5432)
                .with("database.user", "postgresuser")
                .with("database.password", "postgrespw")
                .with("database.server.name", "dbserver1")
                .with("database.dbname", "inventory")
                .with("database.whitelist", "public")
                .with("snapshot.mode", "never")
                .build();

        this.engine = EmbeddedEngine.create()
                .using(config)
                .notifying(this::handleDbChangeEvent)
                .build();

        executorService.execute(engine);
    }

    @PreDestroy
    public void shutdownEngine() {
        engine.stop();
    }

    private void handleDbChangeEvent(SourceRecord record) {
        if (record.topic().equals("dbserver1.public.item")) {
            Long itemId = ((Struct) record.key()).getInt64("id");
            Struct payload = (Struct) record.value();
            Operation op = Operation.forCode(payload.getString("op"));

            if (op == Operation.UPDATE || op == Operation.DELETE) {
                emf.getCache().evict(Item.class, itemId);
            }
        }
    }
}

Upon application start-up, this configures an instance of the Debezium Postgres connector and sets up the embedded engine for running the connector. The connector options (host name, credentials etc.) are mostly the same as when deploying the connector into Kafka Connect. There is no need for doing an initial snapshot of the existing data, hence the snapshot mode is set to "never".

The offset storage option is used for controlling how connector offsets should be persisted. As it’s not necessary to process any change events occurring while the connector is not running (instead you’d just begin to read the log from the current location after the restart), the in-memory implementation provided by Kafka Connect is used.

Once configured, the embedded engine must be run via an Executor instance. As the example runs in WildFly, a managed executor can simply be obtained through @Resource injection for that purpose (see JSR 236).

The embedded engine is configured to invoke the handleDbChangeEvent() method for each received data change event. In this method it first is checked whether the incoming event originates from the item table. If that’s the case, and if the change event represents an UPDATE or DELETE statement, the affected Item instance is evicted from the second-level cache. JPA 2.0 provides a simple API for this purpose which is accessible via the EntityManagerFactory.

With the DatabaseChangeEventListener class in place, the cache entry will now automatically be evicted when doing another item update via psql. When placing the first purchase order for that item after the update, you’ll see in the application log how Hibernate ORM executes a query SELECT ... FROM item ... in order to load the item referenced by the order. Also the cache statistics will report one "L2C miss". Upon subsequent orders of that same item it will be obtained from the cache again.

Eventual Consistency

While the event handling happens in near-realtime, it’s important to point out that it still applies eventual consistency semantics. This means that there is a very short time window between the point in time where a transaction is committed and the point in time where the change event is streamed from the log to the event handler and the cache entry is invalidated.

Avoiding Cache Invalidations After Application-triggered Data Changes

The change event listener shown above satisfies the requirement of invalidating cached items after external data changes. But in its current form it is evicting cache items a bit too aggressively: cached items will also be purged when updating an Item instance through the application itself. This is not only not needed (as the cached item already is the current version), but it’s even counter-productive: the superfluous cache evictions will cause additional database roundtrips, resulting in longer response times.

It is therefore necessary to distinguish between data changes performed by the application itself and external data changes. Only in the latter case the affected items should be evicted from the cache. In order to do so, you can leverage the fact that each Debezium data change event contains the id of the originating transaction. Keeping track of all transactions run by the application itself allows to trigger the cache eviction only for those items altered by external transactions.

Accounting for this change, the overall architecture looks like so:

Architecture Overview with Transaction Registry

The first thing to implement is the transaction registry, i.e. a class for the transaction book keeping:

@ApplicationScoped
public class KnownTransactions {

    private final DefaultCacheManager cacheManager;
    private final Cache<Long, Boolean> applicationTransactions;

    public KnownTransactions() {
        cacheManager = new DefaultCacheManager();
        cacheManager.defineConfiguration(
                "tx-id-cache",
                new ConfigurationBuilder()
                    .expiration()
                        .lifespan(60, TimeUnit.SECONDS)
                    .build()
                );

        applicationTransactions = cacheManager.getCache("tx-id-cache");
    }

    @PreDestroy
    public void stopCacheManager() {
        cacheManager.stop();
    }

    public void register(long txId) {
        applicationTransactions.put(txId, true);
    }

    public boolean isKnown(long txId) {
        return Boolean.TRUE.equals(applicationTransactions.get(txId));
    }
}

This uses the Infinispan DefaultCacheManager for creating and maintaining an in-memory cache of transaction ids encountered by the application. As data change events arrive in near-realtime, the TTL of the cache entries can be rather short (in fact, the value of one minute shown in the example is chosen very conservatively, usually events should be received within seconds).

The next step is to retrieve the current transaction id whenever a request is processed by the application and register it within KnownTransactions. This should happen once per transaction. There are multiple ways for implementing this logic; in the following a Hibernate ORM FlushEventListener is used for this purpose:

class TransactionRegistrationListener implements FlushEventListener {

    private volatile KnownTransactions knownTransactions;

    public TransactionRegistrationListener() {
    }

    @Override
    public void onFlush(FlushEvent event) throws HibernateException {
        event.getSession().getActionQueue().registerProcess( session -> {
            Number txId = (Number) event.getSession().createNativeQuery("SELECT txid_current()")
                    .setFlushMode(FlushMode.MANUAL)
                    .getSingleResult();

            getKnownTransactions().register(txId.longValue());
        } );
    }

    private  KnownTransactions getKnownTransactions() {
        KnownTransactions value = knownTransactions;

        if (value == null) {
            knownTransactions = value = CDI.current().select(KnownTransactions.class).get();
        }

        return value;
    }
}

As there’s no portable way to obtain the transaction id, this is done using a native SQL query. In the case of Postgres, the txid_current() function can be called for that. Hibernate ORM event listeners are not subject to dependency injection via CDI. Hence the static current() method is used to obtain a handle to the application’s CDI container and get a reference to the KnownTransactions bean.

This listener will be invoked whenever Hibernate ORM is synchronizing its persistence context with the database ("flushing"), which usually happens exactly once when the transaction is committed.

Manual Flushes

The session / entity manager can also be flushed manually, in which case the txid_current() function would be invoked multiple times. That’s neglected here for the sake of simplicity. The actual code in the example repo contains a slightly extended version of this class which makes sure that the transaction id is obtained only once.

To register the flush listener with Hibernate ORM, an Integrator implementation must be created and declared in the META-INF/services/org.hibernate.integrator.spi.Integrator file:

public class TransactionRegistrationIntegrator implements Integrator {

    @Override
    public void integrate(Metadata metadata, SessionFactoryImplementor sessionFactory,
            SessionFactoryServiceRegistry serviceRegistry) {
        serviceRegistry.getService(EventListenerRegistry.class)
            .appendListeners(EventType.FLUSH, new TransactionRegistrationListener());
    }

    @Override
    public void disintegrate(SessionFactoryImplementor sessionFactory,
            SessionFactoryServiceRegistry serviceRegistry) {
    }
}
io.debezium.examples.cacheinvalidation.persistence.TransactionRegistrationIntegrator

During bootstrap, Hibernate ORM will detect the integrator class (by means of the Java service loader), invoke its integrate() method which in turn will register the listener class for the FLUSH event.

The last step is to exclude any events stemming from transactions run by the application itself in the database change event handler:

@ApplicationScoped
public class DatabaseChangeEventListener {

    // ...

    @Inject
    private KnownTransactions knownTransactions;

    private void handleDbChangeEvent(SourceRecord record) {
        if (record.topic().equals("dbserver1.public.item")) {
            Long itemId = ((Struct) record.key()).getInt64("id");
            Struct payload = (Struct) record.value();
            Operation op = Operation.forCode(payload.getString("op"));
            Long txId = ((Struct) payload.get("source")).getInt64("txId");

            if (!knownTransactions.isKnown(txId) &&
                    (op == Operation.UPDATE || op == Operation.DELETE)) {
                emf.getCache().evict(Item.class, itemId);
            }
        }
    }
}

And with that, you got all the pieces in place: cached Items will only be evicted after external data changes, but not after changes done by the application itself. To confirm, you can invoke the example’s items resource using curl:

> curl -H "Content-Type: application/json" \
  -X PUT \
  --data '{ "description" : "North by Northwest", "price" : 20.99}' \
  http://localhost:8080/cache-invalidation/rest/items/10003

When placing the next order for the item after this update, you should see that the Item entity is obtained from the cache, i.e. the change event will not have caused the item’s cache entry to be evicted. In contrast, if you update the item’s price via psql another time, the item should be removed from the cache and the order request will produce a cache miss, followed by a SELECT against the item table in the database.

Summary

In this blog post we’ve explored how Debezium and change data capture can be employed to invalidate application-level caches after external data changes. Compared to manual cache invalidation, this approach works very reliably (by capturing changes directly from the database log, no events will be missed) and fast (cache eviction happens in near-realtime after the data changes).

As you have seen, not too much glue code is needed in order to implement this. While the shown implementation is somewhat specific to the entities of the example, it should be possible to implement the change event handler in a more generic fashion, so that it can handle a set of configured entity types (essentially, the database change listener would have to convert the primary key field(s) from the change events into the primary key type of the corresponding entities in a generic way). Also such generic implementation would have to provide the logic for obtaining the current transaction id for the most commonly used databases.

Please let us know whether you think this would be an interesting extension to have for Debezium and Hibernate ORM. For instance this could be a new module under the Debezium umbrella, and it could also be a very great project to work on, should you be interested in contributing to Debezium. If you got any thoughts on this idea, please post a comment below or come to our mailing list.

Many thanks to Guillaume Smet, Hans-Peter Grahsl and Jiri Pechanec for their feedback while writing this post!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Materializing Aggregate Views With Hibernate and Debezium

Updating external full text search indexes (e.g. Elasticsearch) after data changes is a very popular use case for change data capture (CDC).

As we’ve discussed in a blog post a while ago, the combination of Debezium’s CDC source connectors and Confluent’s sink connector for Elasticsearch makes it straight forward to capture data changes in MySQL, Postgres etc. and push them towards Elastisearch in near real-time. This results in a 1:1 relationship between tables in the source database and a corresponding search index in Elasticsearch, which is perfectly fine for many use cases.

It gets more challenging though if you’d like to put entire aggregates into a single index. An example could be a customer and all their addresses; those would typically be stored in two separate tables in an RDBMS, linked by a foreign key, whereas you’d like to have just one index in Elasticsearch, containing documents of customers with their addresses embedded, allowing you to efficiently search for customers based on their address.

Following up to the KStreams-based solution to this we described recently, we’d like to present in this post an alternative for materializing such aggregate views driven by the application layer.

Overview

The idea is to materialize views in a separate table in the source database, right in the moment the original data is altered.

Aggregates are serialized as JSON structures (which naturally can represent any nested object structure) and stored in a specific table. This is done within the actual transaction altering the data, which means the aggregate view is always consistent with the primary data. In particular this approach isn’t prone to exposing intermediary aggregations as the KStreams-based solution discussed in the post linked above.

The following picture shows the overall architecture:

Streaming Materialized Aggregate Views to Elastisearch

Here the aggregate views are materialized by means of a small extension to Hibernate ORM, which stores the JSON aggregates within the source database (note "aggregate views" can be considered conceptually the same as "materialized views" as known from different RDBMS, as in that they materialize the result of a "join" operation, but technically we’re not using the latter to store aggregate views, but a regular table). Changes to that aggregate table are then captured by Debezium and streamed to one topic per aggregate type. The Elasticsearch sink connector can subscribe to these topics and update corresponding full-text indexes.

You can find a proof-of-concept implementation (said Hibernate extension and related code) of this idea in our examples repository. Of course the general idea isn’t limited to Hibernate ORM or JPA, you could implement something similar with any other API you’re using to access your data.

Creating Aggregate Views via Hibernate ORM

For the following let’s assume we’re persisting a simple domain model (comprising a Customer entity and a few related ones like Address, (customer) Category etc.) in a database. Using Hibernate for that allows us to make the creation of aggregates fully transparent to the actual application code using a Hibernate event listener. Thanks to its extensible architecture, we can plug such listener into Hibernate just by adding it to the classpath, from where it will be picked up automatically when bootstrapping the entity manager / session factory.

Our example listener reacts to an annotation, @MaterializeAggregate, which marks those entity types that should be the roots of materialized aggregates.

@Entity
@MaterializeAggregate(aggregateName="customers-complete")
public class Customer {

    @Id
    private long id;

    private String firstName;

    @OneToMany(mappedBy = "customer", fetch = FetchType.EAGER, cascade = CascadeType.ALL)
    private Set<Address> addresses;

    @ManyToOne
    private Category category;

    ...
}

Now if any entity annotated with @MaterializeAggregate is inserted, updated or deleted via Hibernate, the listener will kick in and materialize a JSON view of the aggregate root (customer) and its associated entities (addresses, category).

Under the hood the Jackson API is used for serializing the model into JSON. This means you can use any of its annotations to customize the JSON output, e.g. @JsonIgnore to exclude the inverse relationship from Address to Customer:

@Entity
public class Address {

    @Id
    private long id;

    @ManyToOne
    @JoinColumn(name = "customer_id")
    @JsonIgnore
    private Customer customer;

    private String street;

    private String city;

    ...
}

Note that Address itself isn’t marked with @MaterializeAggregate, i.e. it won’t be materialized into an aggregate view by itself.

After using JPA’s EntityManager to insert or update a few customers, let’s take a look at the aggregates table which has been populated by the listener (value schema omitted for the sake of brevity):

> select * from aggregates;

| rootType | keySchema | rootId | materialization | valueSchema |

| customers-complete

| {
  "schema" : {
    "type" : "struct",
    "fields" : [ {
      "type" : "int64",
      "optional" : false,
      "field" : "id"
    } ],
    "optional" : false,
    "name" : "customers-complete.Key"
  }
}

| { "id" : 1004 }

| { "schema" : { ... } }

| {
  "id" : 1004,
  "firstName" : "Anne",
  "lastName" : "Kretchmar",
  "email" : "annek@noanswer.org",
  "tags" : [ "long-term", "vip" ],
  "birthday" : 5098,
  "category" : {
    "id" : 100001,
    "name" : "Retail"
  },
  "addresses" : [ {
    "id" : 16,
    "street" : "1289 University Hill Road",
    "city" : "Canehill",
    "state" : "Arkansas",
    "zip" : "72717",
    "type" : "SHIPPING"
  } ]
} |

The table contains these columns:

  • rootType: The name of the aggregate as given in the @MaterializeAggregate annotation

  • rootId: The aggregate’s id as serialized JSON

  • materialization: The aggregate itself as serialized JSON; in this case a customer and their addresses, category etc.

  • keySchema: The Kafka Connect schema of the row’s key

  • valueSchema: The Kafka Connect schema of the materialization

Let’s talk about the two schema columns for a bit. JSON itself is quite limited as far as its supported data types are concerned. So for instance we’d loose information about a numeric field’s value range (int vs. long etc.) without any additional information. Therefore the listener derives the corresponding schema information for key and aggregate view from the entity model and stores it within the aggregate records.

Now Jackson itself only supports JSON Schema, which would be a bit too limited for our purposes. Hence the example implementation provides custom serializers for Jackson’s schema system, which allow us to emit Kafka Connect’s schema representation (with more precise type information) instead of plain JSON Schema. This will come in handy in the following when we’d like to expand the string-based JSON representations of key and value into properly typed Kafka Connect records.

Capturing Changes to the Aggregate Table

We now have a mechanism in place which transparently persists aggregates into a separate table within the source database, whenever the application data is changed through Hibernate. Note that this happens within the boundaries of the source transaction, so if the same would be rolled back for some reason, also the aggregate view would not be updated.

The Hibernate listener uses insert-or-update semantics when writing an aggregate view, i.e. for a given aggregate root there’ll always be exactly one corresponding entry in the aggregate table which reflects its current state. If an aggregate root entity is deleted, the listener will also drop the entry from the aggregate table.

So let’s set up Debezium now to capture any changes to the aggregates table:

curl -i -X POST \
  -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  http://localhost:8083/connectors/ -d @- <<-EOF
  {
      "name": "inventory-connector",
      "config": {
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "tasks.max": "1",
          "database.hostname": "mysql",
          "database.port": "3306",
          "database.user": "debezium",
          "database.password": "dbz",
          "database.server.id": "184054",
          "database.server.name": "dbserver1",
          "database.whitelist": "inventory",
          "table.whitelist": ".*aggregates",
          "database.history.kafka.bootstrap.servers": "kafka:9092",
          "database.history.kafka.topic": "schema-changes.inventory"
      }
  }
EOF

This registers the MySQL connector with the "inventory" database (we’re using an expanded version of the schema from the Debezium tutorial), capturing any changes to the "aggregates" table.

Expanding JSON

If we now were to browse the corresponding Kafka topic, we’d see data change events in the known Debezium format for all the changes to the aggregates table.

The "materialization" field with the records' "after" state still is a single field containing a JSON string, though. What we’d rather like to have is a strongly typed Kafka Connect record, whose schema exactly describes the aggregate structure and the types of its fields. For that purpose the example project provides an SMT (single message transform) which takes the JSON materialization and the corresponding valueSchema and converts this into a full-blown Kafka Connect record. The same is done for keys. DELETE events are rewritten into tombstone events. Finally, the SMT re-routes every record to a topic named after the aggregate root, allowing consumers to subscribe just to changes to specific aggregate types.

So let’s add that SMT when registering the Debezium CDC connector:

...
"transforms":"expandjson",
"transforms.expandjson.type":"io.debezium.aggregation.smt.ExpandJsonSmt",
...

When now browsing the "customers-complete" topic, we’ll see the strongly typed Kafka Connect records we’d expect:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int64",
                "optional": false,
                "field": "id"
            }
        ],
        "optional": false,
        "name": "customers-complete.Key"
    },
    "payload": {
        "id": 1004
    }
}
{
    "schema": {
        "type": "struct",
        "fields": [ ... ],
        "optional": true,
        "name": "urn:jsonschema:com:example:domain:Customer"
    },
    "payload": {
        "id": 1004,
        "firstName": "Anne",
        "lastName": "Kretchmar",
        "email": "annek@noanswer.org",
        "active": true,
        "tags" : [ "long-term", "vip" ],
        "birthday" : 5098,
        "category": {
            "id": 100001,
            "name": "Retail"
        },
        "addresses": [
            {
                "id": 16,
                "street": "1289 University Hill Road",
                "city": "Canehill",
                "state": "Arkansas",
                "zip": "72717",
                "type": "LIVING"
            }
        ]
    }
}

To confirm that these are actual typed Kafka Connect records and not just a single JSON string field, you could for instance use the Avro message converter and examine the message schemas in the schema registry.

Sinking Aggregate Messages Into Elasticsearch

The last missing step is to register the Confluent Elasticsearch sink connector, hooking it up with the "customers-complete" topic and letting it push any changes to the corresponding index:

curl -i -X POST \
  -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  http://localhost:8083/connectors/ -d @- <<-EOF
  {
      "name": "es-customers",
      "config": {
          "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
          "tasks.max": "1",
          "topics": "customers-complete",
          "connection.url": "http://elastic:9200",
          "key.ignore": "false",
          "schema.ignore" : "false",
          "behavior.on.null.values" : "delete",
          "type.name": "customer-with-addresses",
          "transforms" : "key",
          "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
          "transforms.key.field": "id"
      }
  }
EOF

This uses Connect’s ExtractField transformation to obtain just the actual id value from the key struct and use it as key for the corresponding Elasticsearch documents. Specifying the "behavior.on.null.values" option will let the connector delete the corresponding document from the index when encountering a tombstone message (i.e. a message with a key but without value).

Finally, we can use the Elasticsearch REST API to browse the index and of course use its powerful full-text query language to find customers by the address or any other property embedded into the aggregate structure:

> curl -X GET -H "Accept:application/json" \
  http://localhost:9200/customers-complete/_search?pretty

  {
      "_shards": {
          "failed": 0,
          "successful": 5,
          "total": 5
      },
      "hits": {
          "hits": [
              {
                  "_id": "1004",
                  "_index": "customers-complete",
                  "_score": 1.0,
                  "_source": {
                      "active": true,
                      "addresses": [
                          {
                              "city": "Canehill",
                              "id": 16,
                              "state": "Arkansas",
                              "street": "1289 University Hill Road",
                              "type": "LIVING",
                              "zip": "72717"
                          }
                      ],
                      "tags" : [ "long-term", "vip" ],
                      "birthday" : 5098,
                      "category": {
                          "id": 100001,
                          "name": "Retail"
                      },
                      "email": "annek@noanswer.org",
                      "firstName": "Anne",
                      "id": 1004,
                      "lastName": "Kretchmar",
                      "scores": [],
                      "someBlob": null,
                      "tags": []
                  },
                  "_type": "customer-with-addresses"
              }
          ],
          "max_score": 1.0,
          "total": 1
      },
      "timed_out": false,
      "took": 11
  }

And there you have it: a customer’s complete data, including their addresses, categories, tags etc., materialized into a single document within Elasticsearch. If you’re using JPA to update the customer, you’ll see the data in the index being updated accordingly in near-realtime.

Pros and Cons

So what are the advantages and disadvantages of this approach for materializing aggregates from multiple source tables compared to the KStreams-based approach?

The big advantage is consistency and awareness of transactional boundaries, whereas the KStreams-based solution in its suggested form was prone to exposing intermediary aggregates. For instance, if you’re storing a customer and three addresses, it might happen that the streaming query first creates an aggregation of the customer and the two addresses inserted first, and shortly thereafter the complete aggregate with all three addresses. This not the case for the approach discussed here, as you’ll only ever stream complete aggregates to Kafka. Also this approach feels a bit more "light-weight", i.e. a simple marker annotation (together with some Jackson annotations for fine-tuning the emitted JSON structures) is enough in order to materialize aggregates from your domain model, whereas some more effort was needed to set up the required streams, temporary tables etc. with the KStreams solution.

The downside of driving aggregations through the application layer is that it’s not fully agnostic to the way you access the primary data. If you bypass the application, e.g. by patching data directly in the database, naturally these updates would be missed, requiring a refresh of affected aggregates. Although this again could be done through change data capture and Debezium: change events to source tables could be captured and consumed by the application itself, allowing it to re-materialize aggregates after external data changes. You also might argue that running JSON serializations within source transactions and storing aggregates within the source database represents some overhead. This often may be acceptable, though.

Another question to ask is what’s the advantage of using change data capture on an intermediary aggregate table over simply posting REST requests to Elasticsearch. The answer is the highly increased robustness and fault tolerance. If the Elasticsearch cluster can’t be accessed for some reason, the machinery of Kafka and Kafka Connect will ensure that any change events will be propagated eventually, once the sink is up again. Also other consumers than Elasticsearch can subscribe to the aggregate topic, the log can be replayed from the beginning etc.

Note that while we’ve been talking primarily about using Elasticsearch as a data sink, there are also other datastores and connectors that support complexly structured records. One example would be MongoDB and the sink connector maintained by Hans-Peter Grahsl, which one could use to sink customer aggregates into MongoDB, for instance enabling efficient retrieval of a customer and all their associated data with a single primary key look-up.

Outlook

The Hibernate ORM extension as well as the SMT discussed in this post can be found in our examples repository. They should be considered to be at "proof-of-concept" level currently.

That being said, we’re considering to make this a Debezium component proper, allowing you to employ this aggregation approach within your Hibernate-based applications just by pulling in this new component. For that we’d have to improve a few things first, though. Most importantly, an API is needed which will let you (re-)create aggregates on demand, e.g. for existing data or for data updated by bulk updates via the Criteria API / JPQL (which will be missed by listeners). Also aggregates should be re-created automatically, if any of the referenced entities change (with the current PoC, only a change to the customer instance itself will trigger its aggregate view to be rebuilt, but not a change to one of its addresses).

If you like this idea, then let us know about it, so we can gauge the general interest in this. Also, this would be a great item to work on, if you’re interested in contributing to the Debezium project. Looking forward to hearing from you, e.g. in the comment section below or on our mailing list.

Thanks a lot to Hans-Peter Grahsl for his feedback on an earlier version of this post!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Streaming MySQL Data Changes to Amazon Kinesis

Most of the times Debezium is used to stream data changes into Apache Kafka. What though if you’re using another streaming platform such as Apache Pulsar or a cloud-based solution such as Amazon Kinesis, Azure Event Hubs and the like? Can you still benefit from Debezium’s powerful change data capture (CDC) capabilities and ingest changes from databases such as MySQL, Postgres, SQL Server etc.?

Turns out, with just a bit of glue code, you can! In the following we’ll discuss how to use Debezium to capture changes in a MySQL database and stream the change events into Kinesis, a fully-managed data streaming service available on the Amazon cloud.

Introducing the Debezium Embedded Engine

Debezium is implemented as a set of connectors for Kafka and thus usually is run via Kafka Connect. But there’s one little gem in Debezium which isn’t as widely known yet, which is the embedded engine.

When using this engine, the Debezium connectors are not executed within Kafka Connect, but as a library embedded into your own Java application. For this purpose, the debezium-embedded module provides a small runtime environment which performs the tasks that’d otherwise be handled by the Kafka Connect framework: requesting change records from the connector, committing offsets etc. Each change record produced by the connector is passed to a configured event handler method, which in our case will convert the record into its JSON representation and submit it to a Kinesis stream, using the Kinesis Java API.

The overall architecture looks like so:

Debezium Embedded Engine Streaming to Amazon Kinesis

Now let’s walk through the relevant parts of the code required for that. A complete executable example can be found in the debezium-examples repo on GitHub.

Set-Up

In order to use Debezium’s embedded engine, add the debezium-embedded dependency as well as the Debezium connector of your choice to your project’s pom.xml. In the following we’re going to use the connector for MySQL. We also need to add a dependency to the Kinesis Client API, so these are the dependencies needed:

...
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>0.8.3.Final</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>0.8.3.Final</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.9.0</version>
</dependency>
...

Configuring the Embedded Engine

The Debezium embedded engine is configured through an instance of io.debezium.config.Configuration. This class can obtain values from system properties or from a given config file, but for the sake of the example we’ll simply pass all required values via its fluent builder API:

Configuration config = Configuration.create()
    .with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.mysql.MySqlConnector")
    .with(EmbeddedEngine.ENGINE_NAME, "kinesis")
    .with(MySqlConnectorConfig.SERVER_NAME, "kinesis")
    .with(MySqlConnectorConfig.SERVER_ID, 8192)
    .with(MySqlConnectorConfig.HOSTNAME, "localhost")
    .with(MySqlConnectorConfig.PORT, 3306)
    .with(MySqlConnectorConfig.USER, "debezium")
    .with(MySqlConnectorConfig.PASSWORD, "dbz")
    .with(MySqlConnectorConfig.DATABASE_WHITELIST, "inventory")
    .with(MySqlConnectorConfig.TABLE_WHITELIST, "inventory.customers")
    .with(EmbeddedEngine.OFFSET_STORAGE,
        "org.apache.kafka.connect.storage.MemoryOffsetBackingStore")
    .with(MySqlConnectorConfig.DATABASE_HISTORY,
        MemoryDatabaseHistory.class.getName())
    .with("schemas.enable", false)
    .build();

If you’ve ever set up the Debezium MySQL connector in Kafka Connect, most of the properties will look familiar to you.

But let’s talk about the OFFSET_STORAGE and DATABASE_HISTORY options in a bit more detail. They deal with how connector offsets and the database history should be persisted. When running the connector via Kafka Connect, both would typically be stored in specific Kafka topics. But that’s not an option here, so an alternative is needed. For this example we’re simply going to keep the offsets and database history in memory. I.e. if the engine is restarted, this information will be lost and the connector will start from scratch, e.g. with a new initial snapshot.

While out of scope for this blog post, it wouldn’t be too difficult to create alternative implementations of the OffsetBackingStore and DatabaseHistory contracts, respectively. For instance if you’re fully committed into the AWS cloud services, you could think of storing offsets and database history in the DynamoDB NoSQL store. Note that, different from Kafka, a Kinesis stream wouldn’t be suitable for storing the database history. The reason being, that the maximum retention period for Kinesis data streams is seven days, whereas the database history must be kept for the entire lifetime of the connector. Another alternative could be to use the existing filesystem based implementations FileOffsetBackingStore and FileDatabaseHistory, respectively.

The next step is to build an EmbeddedEngine instance from the configuration. Again this is done using a fluent API:

EmbeddedEngine engine = EmbeddedEngine.create()
    .using(config)
    .using(this.getClass().getClassLoader())
    .using(Clock.SYSTEM)
    .notifying(this::sendRecord)
    .build();

The most interesting part here is the notifying call. The method passed here is the one which will be invoked by the engine for each emitted data change record. So let’s take a look at the implementation of this method.

Sending Change Records to Kinesis

The sendRecord() method is where the magic happens. We’ll convert the incoming SourceRecord into an equivalent JSON representation and propagate it to a Kinesis stream.

For that, it’s important to understand some conceptual differences between Apache Kafka and Kinesis. Specifically, messages in Kafka have a key and a value (which both are arbitrary byte arrays). In case of Debezium, the key of data change events represents the primary key of the affected record and the value is a structure comprising of old and new row state as well as some additional metadata.

In Kinesis on the other hand a message contains a data blob (again an arbitrary byte sequence) and a partition key. Kinesis streams can be split up into multiple shards and the partition key is used to determine into which shard a given message should go.

Now one could think of mapping the key from Debezium’s change data events to the Kinesis partition key, but partition keys are limited to a length of 256 bytes. Depending on the length of primary key column(s) in the captured tables, this might not be enough. So a safer option is to create a hash value from the change message key and use that as the partition key. This in turn means that the change message key structure should be added next to the actual value to the Kinesis message’s data blob. While the key column values themselves are part of the value structure, too, a consumer otherwise wouldn’t know which column(s) make up the primary key.

With that in mind, let’s take a look at the sendRecord() implementation:

private void sendRecord(SourceRecord record) {
    // We are interested only in data events not schema change events
    if (record.topic().equals("kinesis")) {
        return;
    }

    // create schema for container with key *and* value
    Schema schema = SchemaBuilder.struct()
        .field("key", record.keySchema())
        .field("value", record.valueSchema())
        .build();

    Struct message = new Struct(schema);
    message.put("key", record.key());
    message.put("value", record.value());

    // create partition key by hashing the record's key
    String partitionKey = String.valueOf(
        record.key() != null ? record.key().hashCode() : -1);

    // create data blob representing the container by using Kafka Connect's
    // JSON converter
    final byte[] payload = valueConverter.fromConnectData(
        "dummy", schema, message);

    // Assemble the put-record request ...
    PutRecordRequest putRecord = new PutRecordRequest();

    putRecord.setStreamName(record.topic());
    putRecord.setPartitionKey(partitionKey);
    putRecord.setData(ByteBuffer.wrap(payload));

    // ... and execute it
    kinesisClient.putRecord(putRecord);
}

The code is quite straight-forward; as discussed above it’s first creating a container structure containing key and value of the incoming source record. This structure then is converted into a binary representation using the JSON converter provided by Kafka Connect (an instance of JsonConverter). Then a PutRecordRequest is assembled from that blob, the partition key and the change record’s topic name, which finally is sent to Kinesis.

The Kinesis client object can be re-used and is set up once like so:

// Uses the credentials from the local "default" AWS profile
AWSCredentialsProvider credentialsProvider =
    new ProfileCredentialsProvider("default");

this.kinesisClient = AmazonKinesisClientBuilder.standard()
    .withCredentials(credentialsProvider)
    .withRegion("eu-central-1") // use your AWS region here
    .build();

With that, we’ve set up an instance of Debezium’s EmbeddedEngine which runs the configured MySQL connector and passes each emitted change event to Amazon Kinesis. The last missing step is to actually run the engine. This is done on a separate thread using an Executor, e.g. like so:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);

Note you also should make sure to properly shut down the engine eventually. How that can be done is shown in the accompanying example in the debezium-examples repo.

Running the Example

Finally let’s take a look at running the complete example and consuming the Debezium CDC events from the Kinesis stream. Start by cloning the examples repository and go to the kinesis directory:

git clone https://github.com/debezium/debezium-examples.git
cd debezium-examples/kinesis

Make sure you’ve met the prerequisites described in the example’s README.md; most notably you should have a local Docker installation and you’ll need to have set up an AWS account as well as have the AWS client tools installed. Note that Kinesis isn’t part of the free tier when registering with AWS, i.e. you’ll pay a (small) amount of money when executing the example. Don’t forget to delete the streams you’ve set up once done, we won’t pay your AWS bills :)

Now run Debezium’s MySQL example database to have some data to play with:

docker run -it --rm --name mysql -p 3306:3306 \
  -e MYSQL_ROOT_PASSWORD=debezium \
  -e MYSQL_USER=mysqluser \
  -e MYSQL_PASSWORD=mysqlpw \
  debezium/example-mysql:0.8

Create a Kinesis stream for change events from the customers table:

aws kinesis create-stream --stream-name kinesis.inventory.customers \
  --shard-count 1

Execute the Java application that runs the Debezium embedded engine (if needed, adjust the value of the kinesis.region property in pom.xml to your own region first):

mvn exec:java

This will start up the engine and the MySQL connector, which takes an initial snapshot of the captured database.

In order to take a look at the CDC events in the Kinesis stream, the AWS CLI can be used (usually, you’d implement a Kinesis Streams application for consuming the events). To do so, set up a shard iterator first:

ITERATOR=$(aws kinesis get-shard-iterator --stream-name kinesis.inventory.customers --shard-id 0 --shard-iterator-type TRIM_HORIZON | jq '.ShardIterator')

Note how the jq utility is used to obtain the generated id of the iterator from the JSON structure returned by the Kinesis API. Next that iterator can be used to examine the stream:

aws kinesis get-records --shard-iterator $ITERATOR

You should receive an array of records like this:

{
    "Records": [
        {
            "SequenceNumber":
                "49587760482547027816046765529422807492446419903410339842",
            "ApproximateArrivalTimestamp": 1535551896.475,
            "Data": "eyJiZWZvcm...4OTI3MzN9",
            "PartitionKey": "eyJpZCI6MTAwMX0="
        },
        ...
    ]
}

The Data element is a Base64-encoded representation of the message’s data blob. Again jq comes in handy: we can use it to just extract the Data part of each record and decode the Base64 representation (make sure to use jq 1.6 or newer):

aws kinesis get-records --shard-iterator $ITERATOR | \
  jq -r '.Records[].Data | @base64d' | jq .

Now you should see the change events as JSON, each one with key and value:

{
  "key": {
    "id": 1001
  },
  "value": {
    "before": null,
    "after": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "0.8.1.Final",
      "name": "kinesis",
      "server_id": 0,
      "ts_sec": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": "inventory",
      "table": "customers",
      "query": null
    },
    "op": "c",
    "ts_ms": 1535555325628
  }
}
...

Next let’s try and update a record in MySQL:

# Start MySQL CLI client
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 \
  sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" \
  -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

# In the MySQL client
use inventory;
update customers set first_name = 'Trudy' where id = 1001;

If you now fetch the iterator again, you should see one more data change event representing that update:

...

{
  "key": {
    "id": 1001
  },
  "value": {
    "before": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "after": {
      "id": 1001,
      "first_name": "Trudy",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "0.8.1.Final",
      "name": "kinesis",
      "server_id": 223344,
      "ts_sec": 1535627629,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 364,
      "row": 0,
      "snapshot": false,
      "thread": 10,
      "db": "inventory",
      "table": "customers",
      "query": null
    },
    "op": "u",
    "ts_ms": 1535627622546
  }
}

Once you’re done, stop the embedded engine application by hitting Ctrl + C, stop the MySQL server by running docker stop mysql and delete the kinesis.inventory.customers stream in Kinesis.

Summary and Outlook

In this blog post we’ve demonstrated that Debezium cannot only be used to stream data changes into Apache Kafka, but also into other streaming platforms such as Amazon Kinesis. Leveraging its embedded engine and by implementing a bit of glue code, you can benefit from all the CDC connectors provided by Debezium and their capabilities and connect them to the streaming solution of your choice.

And we’re thinking about even further simplifying this usage of Debezium. Instead of requiring you to implement your own application that invokes the embedded engine API, we’re considering to provide a small self-contained Debezium runtime which you can simply execute. It’d be configured with the source connector to run and make use of an outbound plug-in SPI with ready-to-use implementations for Kinesis, Apache Pulsar and others. Of course such runtime would also provide suitable implementations for safely persisting offsets and database history, and it’d offer means of monitoring, health checks etc. Meaning you could connect the Debezium source connectors with your preferred streaming platform in a robust and reliable way, without any manual coding required!

If you like this idea, then please check out JIRA issue DBZ-651 and let us know about your thoughts, e.g. by leaving a comment on the issue, in the comment section below or on our mailing list.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Five Advantages of Log-Based Change Data Capture

Yesterday I had the opportunity to present Debezium and the idea of change data capture (CDC) to the Darmstadt Java User Group. It was a great evening with lots of interesting discussions and questions. One of the questions being the following: what is the advantage of using a log-based change data capturing tool such as Debezium over simply polling for updated records?

So first of all, what’s the difference between the two approaches? With polling-based (or query-based) CDC you repeatedly run queries (e.g. via JDBC) for retrieving any newly inserted or updated rows from the tables to be captured. Log-based CDC in contrast works by reacting to any changes to the database’s log files (e.g. MySQL’s binlog or MongoDB’s op log).

As this wasn’t the first time this question came up, I thought I could provide a more extensive answer also here on the blog. That way I’ll be able to refer to this post in the future, should the question come up again :)

So without further ado, here’s my list of five advantages of log-based CDC over polling-based approaches.

All Data Changes Are Captured

By reading the database’s log, you get the complete list of all data changes in their exact order of application. This is vital for many use cases where you are interested in the complete history of record changes. In contrast, with a polling-based approach you might miss intermediary data changes that happen between two runs of the poll loop. For instance it could happen that a record is inserted and deleted between two polls, in which case this record would never be captured by poll-based CDC.

Related to this is the aspect of downtimes, e.g. when updating the CDC tool. With poll-based CDC, only the latest state of a given record would be captured once the CDC tool is back online, missing any earlier changes to the record that occurred during the downtime. A log-based CDC tool will be able to resume reading the database log from the point where it left off before it was shut down, causing the complete history of data changes to be captured.

Low Delays of Events While Avoiding Increased CPU Load

With polling, you might be tempted to increase the frequency of polling attempts in order to reduce the chances of missing intermediary updates. While this works to some degree, polling too frequently may cause performance issues (as the queries used for polling cause load on the source database). On the other hand, expanding the polling interval will reduce the CPU load but may not only result in missed change events but also in a longer delay for propagating data changes. Log-based CDC allows you to react to data changes in near real-time without paying the price of spending CPU time on running polling queries repeatedly.

No Impact on Data Model

Polling requires some indicator to identify those records that have been changed since the last poll. So all the captured tables need to have some column like LAST_UPDATE_TIMESTAMP which can be used to find changed rows. This can be fine in some cases, but in others such requirement might not be desirable. Specifically, you’ll need to make sure that the update timestamps are maintained correctly on all tables to be captured by the writing applications or e.g. through triggers.

Can Capture Deletes

Naturally, polling will not allow you to identify any records that have been deleted since the last poll. Often times that’s a problem for replication-like use cases where you’d like to have an identical data set on the source database and the replication targets, meaning you’d also like to delete records on the sink side if they have been removed in the source database.

Can Capture Old Record State And Further Meta Data

Depending on the source database’s capabilities, log-based CDC can provide the old record state for update and delete events. Whereas with polling, you’ll only get the current row state. Having the old row state handy in a single change event can be interesting for many use cases, e.g. if you’d like to display the complete data change with old and new column values to an application user for auditing purposes.

In addition, log-based approaches often can provide streams of schema changes (e.g. in form of applied DDL statements) and expose additional metadata such as transaction ids or the user applying a certain change. These things may generally be doable with query-based approaches, too (depending on the capabilities of the database), I haven’t really seen it being done in practice, though.

Summary

And that’s it, five advantages of log-based change data capture. Note that this is not to say that polling-based CDC doesn’t have its applications. If for instance your use case can be satisfied by propagating changes once per hour and it’s not a problem to miss intermediary versions of records that were valid in between, it can be perfectly fine.

But if you’re interested in capturing data changes in near real-time, making sure you don’t miss any change events (including deletions), then I’d recommend very much to explore the possibilities of log-based CDC as enabled by Debezium. The Debezium connectors do all the heavy-lifting for you, i.e. you don’t have to deal with all the low-level specifics of the individual databases and the means of getting changes from their logs. Instead, you can consume the generic and largely unified change data events produced by Debezium.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


back to top