One of the major improvements in Debezium starting in version 1.6 is support for incremental snapshots. In this blog post we are going to explain the motivation for this feature, we will do a deep dive into the implementation details, and we will also show a demo of it.

Why Incremental Snapshots?

One of the biggest pain points in Debezium since its inception was the sup-optimal support for changes to the captured tables list. As a user, you create a new connector with a list of tables to be captured (table.include.list and related options); at a later point in time, it may become necessary to adjust this configuration, so to capture further tables which where not part to CDC initially. If it suffices to only stream changes from these tables, then the problem is pretty simple to solve. But what if you also need to capture the existing contents of the tables?

Capturing existing data in tables is traditionally done by Debezium in the snapshot phase. This phase is executed once upon the first connector start-up, and its objective is capturing consistent data at a point of time (transforming data at rest into data in motion). This can be a fairly long operation, and by definition, it must be executed completely or not at all - a bit like transaction semantics. This means that if the snapshot is not completed due to a connector restart for instance, it must be re-executed from scratch, and everything already done is thrown away. Also, while the snapshot is taken, any data modifications that are executed in parallel in the database are not streamed until the snapshot has been completed. This could lead to problems with database resources for very large snapshots, as transaction logs must be kept available until the streaming is started.

We are thus ended up with three issues to be solved:

  • The near-impossibility of adding of additional tables to the captured tables list, if existing data must be streamed

  • A long-running process for consistent snapshotting that cannot be terminated or resumed

  • Change data streaming being blocked till the snapshot is completed

Legacy Solutions

The problem was well known, and over time we developed workarounds and also ideated possible improvements and new solutions. As a workaround, the general recommendation was to use a multiple connector approach. The user was asked to:

  • Stop the connector

  • Create a new one to take the snapshot of new tables (using the initial_only snapshotting mode)

  • When completed, stop the new connector

  • Reconfigure and start the old connector with newly captured tables added to the list

This method somewhat did the trick, but is very clumsy, and all the questions around snapshot consistency mentioned above still apply.

The next step was a community contribution into the Debezium connector for MySQL via DBZ-175. It was based on the notion of having multiple binary log readers in place. One reader would capture the originally configured tables, while the other one will snapshot the new tables and then capture changes from the new tables. The latter reader would catch up with the original one, and then they would be reconciled and merged into a single one.

The code was working well, but it never left the incubating stage, as the process itself was quite complex and liable to errors in corner cases. Last but not least, it was an ingenious approach, but unfortunately not portable to other connectors.

Watermark-based Snapshots

In late 2019, the Netflix engineering team announced that they had developed an in-house change data capture framework. They also came up with an innovative solution of executing concurrent snapshots using watermarking, described in the paper DBLog: A Watermark Based Change-Data-Capture Framework by Andreas Andreakis and Ioannis Papapanagiotou.

The main idea behind this approach is that change data streaming is executed continuously together with snapshotting. The framework inserts low and high watermarks into the transaction log (by writing to the source database) and between those two points, a part of the snapshotted table is read. The framework keeps a record of database changes in between the watermarks and reconciles them with the snapshotted values, if the same records are snapshotted and modified during the window.

This means that the data is snapshotted in chunks - no lengthy process at the connector start, and also in case of crashes or a controlled termination of the connector, the snapshotting can be resumed since the last completed chunk.

As per Netflix, the implementation is provided for MySQL and PostgreSQL databases.

Signalling Table

Before moving to Debezium’s implementation of the watermark-based snapshotting approach, a small detour is needed.

Sometimes it can be useful to control Debezium from the outside, so to force it to execute some requested action. Let’s suppose it is necessary to re-snapshot an already snapshotted table - a so-called ad-hoc snapshot. The user would need to send a command to Debezium to pause the current operation and do the snapshot. For that purpose, Debezium defines the concept signals, issued via a signalling table. This is a special table, designated for communication between the user and Debezium. Debezium captures the table and when the user requires a certain operation to be executed, they simply write a record to the signalling table (sending a signal). Debezium will receive the captured change and then execute the required action.

Incremental Snapshotting in Debezium

When we became aware of DBLog’s snapshotting approach, we decided that the method is a universal one and that we could try to adopt it in Debezium, too. Also as we share a lot of codebase among the different connectors (using the Debezium connector framework) our objective was to implement it in the Debezium core component, so that all connectors would benefit from the feature at once. The design and implementation were driven by the DDD-3 Debezium design document.

Incremental snapshotting in Debezium is available in form of ad-hoc snapshots. The user does not configure the connector to execute the snapshot, but instead they use the signalling mechanism to send a snapshot signal and thus trigger a snapshot of a set of tables. The signal in question is called execute-snapshot and the signal message follows the format of:

{"data-collections": ["<table-id-1>", "<table-id-2>", "<table-id-3>", ...]}

When a table snapshot is requested, then Debezium will do the following:

  • Obtain the largest primary key in the table; this is the snapshot endpoint, and its value is stored in the connector offsets

  • Split the table into chunks based on the primary key’s total order and of a size as prescribed by the incremental.snapshot.chunk.size configuration option

When a chunk is queried, a dynamic SQL statement is built, selecting the next incremental.snapshot.chunk.size records, whose primary keys are larger than the last one from the previous chunk (or the first primary key for the first chunk) and which are smaller or equal to the recorded maximum primary key.

The default chunk size is 1,024. You may increase the value for efficiency purposes (a smaller total number of snapshot queries will be executed), but this should be balanced with the increased memory consumption needed for the buffer. It is recommended to do some experimentation in your own environment to identify the setting working best for your situation.

The reading of a chunk is a slightly complicated procedure:

  • A snapshot-window-open signal is sent

  • The chunk query is executed and the chunk content is read into memory

  • A snapshot-window-close signal is sent

Why is this needed? Why it is not enough to just query the database? The answers lie in the following picture:

Figure 1. The transaction isolation

Debezium is not the only process accessing the database. We can expect a multitude of processes accessing the database concurrently, potentially accessing the same records which currently are snapshotted. As shown in the picture, any changes to data are written to the transaction log based on the commit order. As it is not possible to precisely time the chunk read transaction to identify potential conflicts, the open and close window events are added to demarcate the time in which the conflicts can happen. Debezium’s task is the deduplication of those conflicts.

For that purpose, Debezium records all events generated by the chunk into a buffer. When the snapshot-window-open signal is received, then all events coming from the transaction log are checked whether they belong to the snapshotted table(s). If yes, then the buffer is checked whether it contains the primary key. If yes, then the snapshot event is dropped from the buffer, as this is a potential conflict. And as it is not possible to correctly order the snapshot and transaction log events, only the transaction log event is kept. When the snapshot-window-close signal is received, the remaining snapshot events in the buffer are sent downstream.

The following image shows an example of how such a buffer works and how are the transaction log events are filtered before being sent downstream:

Figure 2. The buffer in action

Records K2, K3, and K4 exist already in the database. Before the snapshot window opens, records K1 gets inserted, K2 updated, and K3 deleted. These events are sent downstream as they are read from the log. The snapshot windows opens, and its query selects K1, K2, and K4 into the buffer. While the window is open, the deletion of K4 is retrieved from the transaction log; the snapshot event for K4 is dropped from the buffer and the deletion event is sent downstream. K5 and K6 are inserted, which is retrieved from the log, corresponding events will be emitted. Depending on the specific timing, there may be read events for them in the buffer too (in the image that’s the case for K5), which would be dropped. When the snapshot window closes, the remaining snapshot events for K1 and K2 will be emitted from the buffer.

Connector Restarts

By now we have demonstrated that, using the notion of incremental snapshots, the same table(s) can be snapshotted repeatedly, if and when needed, while the connector is running. We have shown that its execution does not stop streaming from the transaction log. The last item is pausing and continuation of the process.

When an incremental snapshot is running, then incremental snapshot context is added to each of the message offsets. The context is represented by three pieces of information:

  • The list of tables to be snapshotted where the first one is the one currently snapshotted

  • The maximum primary key of the table

  • The primary key of the last event from incremental snapshot sent downstream

These three items are enough to resume the snapshot after a connector restart, be it intentionally or after a crash. Upon connector start, the component responsible for the snapshotting reads the data from the offsets. It initializes its internal state and resumes snapshotting after the last processed event. Note that any records which were inserted or updated while the connector wasn’t running, will be processed via the regular stream reading, i.e. they are not subject to the ongoing snapshot.

This approach ensures the robustness of the process, resilience to restarts and crashes, and minimizes the number of redelivered events (at-least-once delivery semantics still apply).

Limitations

The incremental snapshotting has few drawbacks in comparison to the initial consistent snapshot:

  • The snapshotted table must contain primary keys

  • If an event is deleted from the table during the snapshotting process, then one of these situations can happen:

    • A read event and a delete event are received by downstream consumers

    • Only a delete event is be received

  • If an event is updated in the table during the snapshotting process, then one of these situations can happen:

    • A read event and an update event are received by downstream consumers

    • An update event and read event are received (note the opposite order)

    • Only an update event is received (in case the update happened within the chunk that would have emitted the read event, causing that read event to be discarded during de-duplication)

In general, read events should not be understood as the initial state of the record in a table, but as the state of the record at an arbitrary point of time. Semantics for consumers are slightly changed in comparison to traditional initial snapshots in Debezium, while it will be guaranteed that a consumer has received the complete data set after an incremental snapshot has been completed, there won’t be read (snapshot) events for all records, but it could be update events instead. The same goes for delete events: consumers must be prepared to receive such events for records they had not seen before.

Demo

Having discussed the general concepts, let’s explore things a bit more in an example. We will use our standard tutorial deployment to demonstrate ad-hoc incremental snapshotting. We are using PostgreSQL as the source database. For this demo, you will need multiple terminal windows.

In the beginning we will start the deployment, create the signalling table, and start the connector:

# Terminal 1 - start the deployment
# Start the deployment
export DEBEZIUM_VERSION=1.7
docker-compose -f docker-compose-postgres.yaml up

# Terminal 2
# Create a signalling table
echo "CREATE TABLE inventory.dbz_signal (id varchar(64), type varchar(32), data varchar(2048))" | docker-compose -f docker-compose-postgres.yaml exec -T postgres env PGOPTIONS="--search_path=inventory" bash -c "psql -U $POSTGRES_USER postgres"

# Start Postgres connector, capture only customers table and enable signalling
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @- <<EOF
{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "dbserver1",
        "schema.include": "inventory",
        "table.include.list": "inventory.customers,inventory.dbz_signal",
        "signal.data.collection": "inventory.dbz_signal"
    }
}
EOF

From the log we see that as per the table.include.list setting only one table is snapshotted, customers:

connect_1    | 2021-09-24 13:38:21,781 INFO   Postgres|dbserver1|snapshot  Snapshotting contents of 1 tables while still in transaction   [io.debezium.relational.RelationalSnapshotChangeEventSource]

In the next step we will simulate continuous activity in the database:

# Terminal 3
# Continuously consume messages from Debezium topic for customers table
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers

# Terminal 4
# Modify records in the database via Postgres client
docker-compose -f docker-compose-postgres.yaml exec postgres env PGOPTIONS="--search_path=inventory" bash -c "i=0; while true; do psql -U $POSTGRES_USER postgres -c \"INSERT INTO customers VALUES(default,'name\$i','surname\$i','email\$i')\"; ((i++)); done"

The topic dbserver1.inventory.customers receives a continuous stream of messages. Now the connector will be reconfigured to also capture the orders table:

# Terminal 5
# Add orders table among the captured
curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/inventory-connector/config -d @- <<EOF
{
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "postgres",
    "database.server.name": "dbserver1",
    "schema.include": "inventory",
    "table.include.list": "inventory.customers,inventory.dbz_signal,inventory.orders",
    "signal.data.collection": "inventory.dbz_signal"
}
EOF

As expected, there are no messages for the orders table:

# Terminal 5
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.orders

Now let’s start an incremental ad-hoc snapshot by sending a signal. The snapshot messages for the orders table are delivered to the dbserver1.inventory.orders topic. Messages for the customers table are delivered without interruption.

# Terminal 5
# Send the signal
echo "INSERT INTO inventory.dbz_signal VALUES ('signal-1', 'execute-snapshot', '{\"data-collections\": [\"inventory.orders\"]}')" | docker-compose -f docker-compose-postgres.yaml exec -T postgres env PGOPTIONS="--search_path=inventory" bash -c "psql -U $POSTGRES_USER postgres"

# Check messages for orders table
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.orders

If you were to modify any record in the orders table while the snapshot is running, this would be either emitted as a read event or as an update event, depending on the exact timing and sequence of things.

As the last step, let’s terminate the deployed systems and close all terminals:

# Shut down the cluster
docker-compose -f docker-compose-postgres.yaml down

Summary

In this blog post, we have discussed the motivation for the notion of incremental snapshotting, as introduced by the DBLog paper. We have reviewed the methods used in the past to achieve the described functionality. Then we dived into the deep waters of the implementation of this novel snapshotting approach in Debezium, and in the end we tried to use it live.

We hope you will find incremental snapshotting useful and we look forward to your feedback, experiences, and use cases. In a future blog post, we’ll talk about the support for incremental snaphots of read-only databases (supported by the Debezium MySQL connector as of version 1.7) and how to trigger ad-hoc snapshots using a Kafka topic as the means of signalling instead of a database table.

Jiri Pechanec

Jiri is a software developer (and a former quality engineer) at Red Hat. He spent most of his career with Java and system integration projects and tasks. He lives near Brno, Czech Republic.

   


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.