Every now and then there is a questions in the Debezium chat or on the mailing list how to ensure exactly-once delivery of the records produced by Debezium. So far Debezium aimed only for at-least-once delivery. This means Debezium guarantees every single change will be delivered and there is no missing or skipped change event. However, in case of failures, restarts or DB connection drops, the same event can be delivered more than once. Typical scenario is that the event is delivered twice - once before failure/restart and second time after that. Exactly-once delivery (or semantic) provides stronger guarantee - every single message will be delivered and at the same time there won’t be any duplicates, every single message will be delivered exactly once. So far our answer was that the users have to implement their own deduplication system if they need exactly-once delivery. However, with Kafka Connect support for exactly-once delivery, it seems we can provide exactly-once delivery for Debezium connectors out-of-the-box, only with a little configuration change.

Kafka Connect exactly-once semantics

Kafka itself provides transaction support, a building block for exactly-once delivery, as well as exactly-once delivery itself for quite some time (since Kafka 0.11). What was missing was support for exactly-once delivery in Kafka Connect. This has change in Kafka 3.3.0, which has added support for exactly-once delivery for the source connectors, see KIP-618 and related Jira issue. From the high-level perspective, events produced by the source connector and written during uncommitted transactions are not visible for the consumer and become visible only once a transaction is committed, which also includes a commit of the offset corresponding to these events. The source connector itself doesn’t create the transaction, it’s handled by Kafka Connect. Connector only defines transactional boundaries if needed and in many cases even this is not needed.

Kafka Connect configuration

Exactly-once semantics (EOS) is currently supported only with Kafka Connect in distributed mode. The only thing the user has to do is to enable exactly once delivery on all Kafka Connect workers. The worker can enable exactly-once support by setting exactly.once.source.support=enabled. If you want to do a rolling update on the cluster without shut down, you can first update all the workers to exactly.once.source.support=preparing and then gradually switch the workers to exactly.once.source.support=enabled.

Source connector which requires exactly-once delivery needs to be configured in a similar way by setting exactly.once.support. You can set it either to requested or required:

  • required - Kafka Connect checks that the source connector explicitly supports exactly-once semantics by implementing the SourceConnector::exactlyOnceSupport method. If the connector doesn’t implement this method, the start of the connector would fail.

  • requested - skips the check if the source connector explicitly supports exactly-once delivery or not. In such cases the documentation or the source code of the connector needs to be carefully checked if it can provide EOS or not.

To be sure EOS works properly, it’s always better to use exactly.once.support=required when exactly-once delivery is required by the application and exactly.once.support=requested use only for testing purposes.

Another related source connector configuration option is transaction.boundary. It determines the span on the transaction. Possibles values are:

  • poll - all event return in single SourceTask::poll method call will be wrapped into a single transaction,

  • interval - events will be chunked into the transactions based on the specified time interval, which can be configured via transaction.boundary.interval.ms option,

  • connector - definition of transaction boundaries is delegated to the given source connector.

The default value is poll option.

To sum up, if you want to enable EOS and your source connector supports it, you need to configure exactly.once.source.support=enabled on all workers and update connector configurations with exactly.once.support=required.

Exactly once delivery and Debezium connectors

Debezium has two phases, initial snapshotting phase and streaming phase. We have a hard requirement that the initial snapshot has to finish successfully, otherwise it has to be repeated. If there is any failure during the snapshot phase, after the next start or restart of the connector the snapshot will be taken again and therefore there will be duplicates. Repeating the whole snapshot after the next start makes sense as the data may change until next start or during the restart. As the snapshot should reflect the exact data in time of taking the snapshot, thus in case of failure we have to start from scratch. There would probably be ways to avoid duplicate events in the initial snapshot phase, but for now let’s focus only on the streaming phase.

In the case of streaming the data, the situation is on the other hand quite easy. We store the events obtained from the database in Debezium internal queue and on each Kafka Connect poll() method call we dump the queue and update Kafka offset. Therefore the default transaction boundary, which wraps the poll() method, is a perfect fit for Debezium and Debezium doesn’t have to define any custom transaction boundaries.

It seems Debezium connectors can work with Kafka Connect exactly-once delivery out-of-the-box without any further modifications. However, we have to develop more robust tests for testing EOS and test all the connectors first. For connectors which we would test at least to some extent, we will add implementation of SourceConnector::exactlyOnceSupport method. In the meantime, if you want to test the EOS on your tests or stage environment on your own, you can use exactly.once.support=requested.

Simple test of exactly once delivery when DB connection breaks

Let’s show the simple test of EOS. We will try to break the Debezium connection to the database while the connector is running and data is continuously stored into the database. In such a case Debezium will throw retriable exception, Kafka connect would restart the connector and we would be interested if the connector really started from the point where it stopped the last time and doesn’t send any event twice.

The scenario can look like this:

  1. start Kafka Connect

  2. deploy the Debezium connector

  3. start a custom loader which will continuously insert data into the captured table in the database

  4. kill Debezium connection to the database

  5. let Kafka Connect restart Debezium connector

  6. stop the loader

  7. check, if there are any duplicates in the related Kafka topic.

Here is a concrete example for Postgres database and Debezium Postgres connector.

Let’s create a simple test table which would contain ID, some integer value (can be the same as ID) and let’s say timestamp if we want to check when the record was created:

DROP TABLE IF EXISTS public.eos_test;  -- remove the table first if exists
CREATE TABLE public.eos_test(id SERIAL NOT NULL PRIMARY KEY, val SERIAL, t TIMESTAMP DEFAULT now());

Don’t forget to update Kafka Connect with exactly.once.source.support=enabled. Also, Kafka Connect has to run in distributed mode so you need to modify config/connect-distributed.properties and run the worker with these properties. Once you have Kafka Connect running, you can start Debezium Postgres connector:

{
    "name": "eos",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "topic.prefix": "eos",
        "table.include.list": "public.eos_test",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "exactly.once.support": "requested"
    }
}

To simplify reading of the topic, here we turned off adding schema into the records and also added Debezium SMT for extracting new record state so that only the new value is stored into the Kafka topic.

Now, you can start loading the data into the Postgres test table using e.g. this Python script:

#!/usr/bin/env python3

"""
Loads data into Postgres EOS test table.
"""

import psycopg2

DROP_TABLE = "DROP TABLE IF EXISTS public.eos_test;"
CREATE_TABLE = "CREATE TABLE public.eos_test(id SERIAL NOT NULL PRIMARY KEY," \
               "val SERIAL, t TIMESTAMP DEFAULT now());"
INSERT_INTO_TABLE = "INSERT INTO public.eos_test(val) VALUES (%s)"

try:
    connection = psycopg2.connect(
        user="postgres",
        password="postgres",
        host="127.0.0.1",
        port="5432",
        database="postgres")
    cursor = connection.cursor()

    # cursor.execute(DROP_TABLE)
    # cursor.execute(CREATE_TABLE)
    # connection.commit()

    for i in range(1, 50000):
        cursor.execute(INSERT_INTO_TABLE, (i,))
        connection.commit()
        print(f"wrote {i}")
finally:
    if connection:
        cursor.close()
        connection.close()

It will load 50,000 records into the test table. This should last long enough to check that the data is captured by the Debezium connector and switch to another window and kill the Debezium connection to the database. Depending on the speed of your test environment, you can possibly manage to kill the database connection several times during one run. You can eventually increase/decrease the number of loaded events.

Once out loader script loads the data into the database, kill the Debezium connection to the database, e.g. by connecting to Postgres database and running this command:

SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND datname = 'postgres' AND query like 'START_REPLICATION SLOT %';

This will terminate Debezium connection to the database, which will subsequently cause throwing Postgres driver PSQLException and Kafka Connect RetriableException. As the exception is retriable, Kafka Connect will restart the connector automatically and it will continue with capturing the data from the test table. You can check that all the data has arrived into the Kafka topic using kafka-console-consumer utility:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --property print.key=true --topic eos.public.eos_test

Once you are sure all the records are in the Kafka topic, you can finally run the checker script which would search for duplicate events. The script (again in Python) can look like this:

#!/usr/bin/env python3

"""
Check Kafka topic for records with duplicate key/value.
"""

import json
import kafka

UNIQUE_COLUMN = "id"


def deserialize(rec):
    """
    Deserialize JSON payload and extract value number.
    Ignore transaction boundaries markers.
    """
    try:
        value = json.loads(rec.decode("utf-8"))
        if UNIQUE_COLUMN in value:
            return value[UNIQUE_COLUMN]
        print(f"WARN: no value in record {rec}")
    except:
        print(f"WARN: cannot deserialize record {rec}")

    return None


def check_topic():
    """
    Check Kafka topic for duplicates and prin statistics, including skipped
    records.
    """
    consumer = kafka.KafkaConsumer(
        "eos.public.eos_test",
        bootstrap_servers=["localhost:9092"],
        auto_offset_reset="earliest",
        consumer_timeout_ms=1000,
        enable_auto_commit=True,
        group_id="eos-group",
        value_deserializer=deserialize)

    vals = []
    items = 0
    duplicates = 0
    skipped = 0
    for msg in consumer:
        val = msg.value
        if not val:
            print(f"WARN: skipping None value, items: {items}")
            skipped = skipped + 1
            continue

        items = items + 1
        if val in vals:
            print(f"Duplicate value: {val}")
            duplicates = duplicates + 1
        else:
            vals.append(val)

    print(
        f"Found {duplicates} duplicates in {items} items (unique values: " \
        f"{len(vals)}, skipped values: {skipped})")


check_topic()

Tail of the output can look as follows:

....
WARN: skipping None value, items: 46297
WARN: skipping None value, items: 48345
WARN: cannot deserialize record b'\x00\x00\x00\x00\x00\x00'
WARN: skipping None value, items: 49999
Found 0 duplicates in 49999 items (unique values: 49999, skipped values: 54)

In this case there are no duplicate records, so everything looks good. The only question is what are those skipped events. These are transaction boundaries markers. Python Kafka client for some reason cannot cope with them and fails to deserialize them, so we skip them. Java client should recognize these records and handle them without any issue.

Summary and the next steps

In this blog post we have shown how to configure exactly once semantics for Kafka Connect source connectors and how to use it with Debezium Postgres connector. So far, it seems that there are no issues and at least Debezium Postgres connector can work fine with the exactly-once semantics.

However, not finding an issue of course doesn’t imply that there are no issues. Therefore, as a next step we would like to develop a more rigorous test framework for testing data consistency and exactly-once delivery. We would like to write the tests using the famous Jepsen framework. If we succeed in writing the test, we will share the results in a follow-up blog post. In the meantime we would like to encourage you to test exactly-once delivery also in your environments and deployments to increase the chance to discover any potential bugs. If you run any such test, we would very appreciate if you share the results with us, negative ones when you find a bug as well as positive ones when everything passes.

Vojtěch Juránek

Vojta is a software engineer at Red Hat. He lives in the Czech Republic.

 


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.