Frequently Asked Questions

What is Debezium?

Debezium is a set of distributed services that capture row-level changes in your databases so that your applications can see and respond to those changes. Debezium records in a transaction log all row-level changes committed to each database table. Each application simply reads the transaction logs they’re interested in, and they see all of the events in the same order in which they occurred.

Where did the name "Debezium" come from?

The name is a combination of "DBs", as in the abbreviation for multiple databases, and the "-ium" suffix used in the names of many elements of the periodic table. Say it fast: "DBs-ium". If it helps, we say it like "dee-BEE-zee-uhm".

What is Change Data Capture?

Change Data Capture, or CDC, is an older term for a system that monitors and captures the changes in data so that other software can respond to those changes. Data warehouses often had built-in CDC support, since data warehouses need to stay up-to-date as the data changed in the upstream OLTP databases.

Debezium is essentially a modern, distributed open source change data capture platform that will eventually support monitoring a variety of database systems.

What databases can Debezium monitor?

The latest version of Debezium includes support for monitoring MySQL database servers, MongoDB replica sets or sharded clusters, PostgreSQL servers, Oracle servers (based on LogMiner and XStream), Db2 servers, Cassandra databases (3.x and 4.x) and SQL Server databases. In addition there are work-in-progress Debezium connectors for Vitess servers, and Google Spanner which are released as preview (incubating) versions as of Debezium 2.2.

Note that monitoring PostgreSQL requires installing an extension ("logical decoding plugin") into the PostgreSQL server. Debezium works with Decoderbufs (maintained by the Debezium community) and pgoutput (maintained by the Postgres community).

If your database is hosted on a (managed) cloud service, one of these plug-ins must be installed (or you must be able to install it yourself). Otherwise, you’ll be unable to monitor your database with Debezium. pgoutput is part of Postgres 10 and later; whether it can be enabled depends on the particular database’s configuration.

On CloudSQL, pgoutput is a supported extension and can be installed, as described in Configure PostgreSQL extensions.

On Amazon RDS as well as with Azure Database for PostgreSQL, pgoutput can be used. Both can be used with Debezium.

Support for other DBMSes will be added in future releases. See our roadmap.

What are some uses of Debezium?

The primary use of Debezium is to enable applications to respond almost immediately whenever data in databases change. Applications can do anything with the insert, update, and delete events. They might use the events to know when to remove entries from a cache. They might update search indexes with the data. They might update a derived data store with the same information or with information computed from the changing data, such as with Command Query Responsibility Separation (CQRS). They might send a push notification to one or more mobile devices. They might aggregate the changes and produce a stream of patches for entities. They might be stored to form an audit log. They might drive streaming queries e.g. with Apache Flink or Kafka Streams. They might be used to propagate data between microservices, e.g. employing the outbox pattern.

You can learn more about CDC use cases in this presentation from QCon San Francisco 2019.

Why is Debezium a distributed system?

Debezium is architected to be tolerant of faults and failures, and the only effectively way to do that is with a distributed system. Debezium distributes the monitoring processes, or connectors, across multiple machines so that, if anything goes wrong, the connectors can be restarted. The events are recorded and replicated across multiple machines to minimize risk of information loss.

Can my application directly monitor a single database?

Yes. Although we recommend most people use the full Debezium platform, it is possible for a single application to embed a Debezium connector so it can monitor a database and respond to the events. This approach is indeed far simpler with few moving parts, but it is more limited and far less tolerant of failures. If your application needs at-least-once delivery guarantees of all messages, please consider using the full distributed system.

What does the Debezium platform look like?

A running Debezium system consists of several pieces. A cluster of Apache Kafka brokers provides the persistent, replicated, and partitioned transaction logs where Debezium records all events and from which applications consume all events. The number of Kafka brokers depends largely on the volume of events, the number of database tables being monitored, and the number of applications that are consuming the events. Kafka does rely upon a small cluster of Zookeeper nodes to manage responsibilities of each broker.

Each Debezium connector monitors one database cluster/server, and connectors are configured and deployed to a cluster of Kafka Connect services that ensure that each connector is always running, even as Kafka Connect service instances leave and join the cluster. Each Kafka Connect service cluster (a.k.a., group) is independent, so it is possible for each group within an organization to manage its own clusters.

All connectors record their events (and other information) to Apache Kafka, which persists, replicates, and partitions the events for each table in separate topics. Multiple Kafka Connect service clusters can share a single cluster of Kafka brokers, but the number of Kafka brokers depends largely on the volume of events, the number of database tables being monitored, and the number of applications that are consuming the events.

Applications connect to Kafka directly and consume the events within the appropriate topics.

How many databases can be monitored?

Debezium can monitor any number of databases. The number of connectors that can be deployed to a single cluster of Kafka Connect services depends upon upon the volume and rate of events. However, Debezium supports multiple Kafka Connect service clusters and, if needed, multiple Kafka clusters as well.

How does Debezium affect source databases?

Most databases have to be configured before Debezium can monitor them. For example, a MySQL server must be configured to use the row-level binlog, and to have a user privileged to read the binlog; the Debezium connector must be configured with the correct information, including the privileged user. See the specific connector documentation for details.

Debezium connectors do not store any information inside the upstream databases. However, running a connector may place additional load on the source database.

How are events for a database organized?

Most connectors will record all events for a single database table to a single topic. Additionally, all events within a topic are totally-ordered, meaning that the order of all of those events will be maintained. (Even if events are duplicated during failures, the end result after applying all of the events will remain the same.)

For example, a MySQL connector monitoring a MySQL server/cluster (logically named "dbserverA") records all of the changes to the "Addresses" table within the "Customers" database in the topic named dbserverA.Customers.Addresses. Likewise, all of the changes to the "PaymentMethods" table in the same database will be recorded in the topic named dbserverA.customers.PaymentMethods.

Why are events so large?

Debezium is designed to monitor upstream databases and produce for each row-level change one or more corresponding events that completely describe those changes. But Debezium connectors work continuously, and its events have to make sense even as the structure of the tables in the upstream databases change over time. A consumer is also much easier to write if it only has to deal with a single event at a time, rather than having to track state over the entire history of the event stream.

That means each event needs to be completely self-describing: an event’s key and value each contain a payload with the actual information and a schema that fully describes the structure of the information. Consuming applications can process each event, use the schema to understand the structure of the information in that event, and then correctly process the event’s payload. The consuming application can take advantage of the fact that the schema will remain the same for many events in a row, and only when the schema changes might the consuming application need to do a bit more work preparing for the changed structure.

Meanwhile, the Kafka Connect services serialize the connector’s events and record them in Kafka. The JSON converter is very generic and very simple, but it has no choice but to serialize the entire event information. Therefore, events represented in JSON are indeed verbose and large.

However, there’s an alternative: using a schema registry. That way, actual schema information is managed by the registry, while actual change events only contain the id of the corresponding schema in the registry. This results in a much more efficient representation of events as sent to Kafka. Schema registries can be used with different formats like JSON or Avro. Leveraging Avro as the message format has the additional advantage that payloads are serialized into a very compact binary form.

Using this approach, a Kafka Connect converter and the schema registry work together to track the history of each schema over time. Meanwhile, in the consumer, the same converter decodes the compact binary form of the event, reads the identifier of the schema version used by that message, if it hasn’t yet seen that schema version downloads the schema from the schema registry, and finally uses that schema to decode the payload of the event. Again, many events in sequence will share the same schema (and schema version), so most of the time the converter can simply decode the raw compact event into the same schema and payload expected by the consumer.

How do I use a schema registry?

Options for schema registries include the Apicurio API and Schema Registry and the Confluent Schema Registry. Both come with converters for storing/obtaining JSON and Avro schemas in and from the registry.

If you are deploying Debezium connectors to a Kafka Connect worker service, simply make sure the converter JARs of your registry are available and configure the worker service to use the right Converter. You will, for example, need to point the converter to your Apicurio Schema Registry. Then, simply deploy the Debezium connectors (or really, any other Kafka Connect connectors) to your worker service. See Avro Serialization for a detailed description of how to use the Avro converter with the Apicurio and Confluent registries.

The tutorial example on GitHub shows in detail how to use a schema registry and the accompanying converters with Debezium.

Our Docker images for Kafka Connect include the Avro converter as an option.

What happens when an application stops or crashes?

To consume the change events for a database, an application creates a Kafka consumer that will connect to the Kafka brokers and consume all events for the topics associated with that database. The consumer is configured to periodically record its position (aka, offset) in each topic. When an application stops gracefully and closes the consumer, the consumer will record the offsets for the last event in each topic. When the application restarts at any later time, the consumer looks up those offsets and starts reading the very next events in each topic. Therefore, under normal operating scenarios, the application sees every event exactly one time.

If the application crashes unexpectedly, then upon restart the application’s consumer will look up the last recorded offsets for each topic, and start consume events from the last offset for each topic. In most cases, the application will see some of the same events it saw prior to the crash (but after it recorded the offset), followed by the events it had not yet seen. Thus, the application sees every event at least once. The application can reduce the number of events seen more than once by recording the offsets more frequently, although doing so will negatively affect performance and throughput of the client.

Note that a Kafka consumer can be configured to connect and start reading with the most recent offset in each topic. This can result in missed events, though this is perfectly acceptable for some use cases.

What happens when Debezium stops or crashes?

The behavior of Debezium varies depending upon which components are stopped or crashed. If enough of the Kafka broker were to stop or crash such that the each topic partition is housed by fewer than the minimum number of in-sync replicas, then the connectors writing to those topics and the consuming applications reading from those topics will simply block until the Kafka brokers can be restarted or new brokers brought online. Therefore, the minimum number of in-sync replicas has a very large impact on availability, and for consistency reasons should always be at least 1 (if not 3).

The Kafka Connect service is configured to periodically record the position and offsets of each connector. If one of the Kafka Connect service instances in its cluster is stopped gracefully, all connectors running in that process will be stopped gracefully (meaning all positions and offsets will be recorded) and those same connectors will be restarted on other Kafka Connect service instances in the same cluster. When those connectors are restarted, they will continue recording events exactly where they left off, with no duplicate events being recorded.

When one of the connectors running in a Kafka Connect service cluster is stopped gracefully, it will complete its current work and record the latest positions and offsets in Kafka. Downstream applications consume from the topics will simply wait until new events are added.

If any of the Kafka Connect service instances in its cluster crashes unexpectedly, then all connectors that were running in the crashed process will be restarted on other Kafka Connect service instances in the same cluster. However, when those connectors are restarted, they will begin recording events from the database starting at the position/offset last recorded by the connector before it crashed. This means the newly-restarted connectors may likely record some of the same events it previously recorded prior to the crash, and these duplicates will always be visible to downstream consuming applications.

What happens when a monitored database stops or crashes?

When a database server monitored by Debezium stops or crashes, the Debezium connector will likely try to re-establish communication. Debezium periodically records the connector’s positions and offsets in Kafka, so once the connector establishes communication the connector should continue to read from the last recorded position and offset.

Why must consuming applications expect duplicate events?

When all systems are running nominally or when some or all of the systems are gracefully shut down, then consuming applications can expect to see every event exactly one time. However, when things go wrong it is always possible for consuming applications to see events at least once.

When the Debezium’s systems crash, they are not always able to record their last position/offset. When they are restarted, they recover by starting where were last known to have been, and thus the consuming application will always see every event but may likely see at least some messages duplicated during recovery.

Additionally, network failures may cause the Debezium connectors to not receive confirmation of writes, resulting in the same event being recorded one or more times (until confirmation is received).

What is causing intermittent EventDataDeserializationExceptions with the MySQL connector?

When you run into intermittent deserialization exceptions around 1 minute after starting connector, with a root cause of type EOFException or java.net.SocketException: Connection reset:

Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1542193955000, eventType=GTID, serverId=91111, headerLength=19, dataLength=46, nextPosition=1058898202, flags=0}
Caused by: java.lang.RuntimeException: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1542193955000, eventType=GTID, serverId=91111, headerLength=19, dataLength=46, nextPosition=1058898202, flags=0}
Caused by: java.io.EOFException

or

Caused by: java.net.SocketException: Connection reset

Then updating these MySQL server global properties like this will fix it:

set global slave_net_timeout = 120; (default was 30sec)
set global thread_pool_idle_timeout = 120;

What is Kafka?

Apache Kafka is a fast, scalable, durable, and distributed messaging system that records all messages in replicated, partitioned, and totally-ordered transaction logs. Consumers keep track of their position in the logs, and can control this position indepdently of all other consumers. This means that some consumers can start from the very beginning of the log while others are keeping up with the most recently-recorded messages. Kafka operates as a dynamic cluster of brokers. Each log partition is replicated to multiple brokers so that, should any broker fail, the cluster still has multiple copies of the partition.

Debezium connectors record all events to a Kafka cluster, and applications consume those events through Kafka.

What is Kafka Connect?

Kafka Connect is a framework for scalably and reliably streaming data between Apache Kafka and other systems. It is a recent addition to the Kafka community, and it makes it simple to define connectors that move large collections of data into and out of Kafka, while the framework does most of the hard work of properly recording the offsets of the connectors. A Kafka Connect service has a RESTful API for managing and deploying connectors; the service can be clustered and will automatically distribute the connectors across the cluster, ensuring that the connector is always running.

Debezium use the Kafka Connect framework. All of Debezium’s connectors are Kafka Connector source connectors, and as such they can be deployed and managed using the Kafka Connect service.

How to connect to Kafka when using the Debezium Docker images?

When using Docker for Mac or Docker for Windows, the Docker containers run within a light-weight VM. In order to connect to Kafka from your host system, e.g. with a Kafka Consumer started in a test in your IDE, you need to specify your host system’s IP address or host name as ADVERTISED_HOST_NAME for the Kafka container: docker run -it --rm --name kafka -p 9092:9092 -e ADVERTISED_HOST_NAME=<%YOUR_HOST_NAME%> --link zookeeper:zookeeper debezium/kafka:{debezium-docker-label}. This name will be published by Zookeeper to clients asking for the Kafka broker’s name.

What database size can be handled with the default memory settings for the Connect image?

The memory consumption during start-up and runtime depends on the total number of tables in the database that is monitored by Debezium, the number of columns in each table and also the amount of events coming from the database. As a rule of thumb the default memory settings (maximum heap set to 256 MB) will manage to handle databases where the total count of columns across all tables is less than 10000.

How to retrieve DECIMAL field from binary representation?

If Debezium is configured to handle DECIMAL values as precise then it encodes it as org.apache.kafka.connect.data.Decimal. This type is converted into a BigInteger and serialized as a byte array. To decode it back we need to know the scale of value either in advance or it has to be obtained from the schema. The code for unwrapping then can look like one of the following snippets depending whether the encoded value is available as a byte array or as a string.

byte[] encoded = ...;
int scale = ...;
final BigDecimal decoded = new BigDecimal(new BigInteger(encoded), scale);

String encoded = ...;
int scale = ...;
final BigDecimal decoded = new BigDecimal(new BigInteger(Base64.getDecoder().decode(encoded)), scale);

How to change the offsets of the source database?

This is a highly technical operation manipulating Kafka Connect internals. Please use this only as the last resort solution.

Sometimes the database log contains an invalid data (like invalid date) that needs to be skipped or it is necessary to reprocess part of the log from the past. There is generally no straight way (apart from event.deserialization.failure.handling.mode for MySQL connector) how to achieve this operation but there is a workaround that manipulates Kafka Connect’s internal data.

First step is to find out the name of the topic that contains plugin-offsets. This is configured in offset.storage.topic option.

Next step is to find out the last offset for the given connector, key under which it is stored and identify the partition used to store the offset. An example would be:

$ kafkacat -b localhost -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'
Partition(11) ["inventory-connector",{"server":"dbserver1"}] {"ts_sec":1530088501,"file":"mysql-bin.000003","pos":817,"row":1,"server_id":223344,"event":2}
Partition(11) ["inventory-connector",{"server":"dbserver1"}] {"ts_sec":1530168941,"file":"mysql-bin.000004","pos":3261,"row":1,"server_id":223344,"event":2}

The key for inventory-connector is ["inventory-connector",{"server":"dbserver1"}], the partition number is 11 and the last offset is {"ts_sec":1530168941,"file":"mysql-bin.000004","pos":3261,"row":1,"server_id":223344,"event":2}.

To move back to a previous offset the connector should be stopped and the following command has to be issued:

$ echo '["inventory-connector",{"server":"dbserver1"}]|{"ts_sec":1530168950,"file":"mysql-bin.000003","pos":817,"row":1,"server_id":223344,"event":2}' | \
kafkacat -P -b localhost -t my_connect_offsets -K \| -p 11

How to remove committed offsets for a connector?

This is a highly technical operation manipulating Kafka Connect internals. Please use this only as the last resort solution.

Sometimes while doing experiments (or when a connector was misconfigured at the start) it is necessary to remove the connector offsets to start with a clean state.

The first step is to find out the name of the topic that contains plugin-offsets. This is configured in offset.storage.topic option.

The next step is to find out the last offset for the given connector, key under which it is stored and identify the partition used to store the offset. An example would be:

$ kafkacat -b localhost -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'
Partition(11) ["inventory-connector",{"server":"dbserver1"}] {"ts_sec":1530088501,"file":"mysql-bin.000003","pos":817,"row":1,"server_id":223344,"event":2}
Partition(11) ["inventory-connector",{"server":"dbserver1"}] {"ts_sec":1530168941,"file":"mysql-bin.000004","pos":3261,"row":1,"server_id":223344,"event":2}

The key for inventory-connector is ["inventory-connector",{"server":"dbserver1"}], the partition number is 11 and the last offset is {"ts_sec":1530168941,"file":"mysql-bin.000004","pos":3261,"row":1,"server_id":223344,"event":2}.

To delete connector offsets the connector should be stopped and the following command has to be issued:

$ echo '["inventory-connector",{"server":"dbserver1"}]|' | \
kafkacat -P -Z -b localhost -t my_connect_offsets -K \| -p 11

This command writes a NULL message for the given key which is logically translated to removing stored offsets for the given connector.

Why can’t tombstone events be printed with KSQL?

When using the KSQL streaming query engine, tombstone events (as created by the Debezium connector by default when deleting a record in a captured table) are not supported:

PRINT 'dbserver.inventory.orders' FROM BEGINNING;
com.fasterxml.jackson.databind.node.NullNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode

Consider to remove tombstone events by using the after state extraction SMT and its options for dropping tombstones.

Why Debezium MySQL connector crashes with schema change tools?

When MySQL connector monitors a table to which a schema change tool like Gh-ost or pt-online-schema-change is applied then MySQL connector can crash with exception thrown from value converters. The tools are creating helper tables during migration process and these helper tables need to be included among whitelisted tables.

How to decrease memory consumption in a database-per-tenant pattern?

If your multitenancy is based on single-tenant databases, your Debezium connectors will have to store metadata for columns and tables multiple times. You can decrease memory consumption using JVM -XX:+UseStringDeduplication flag. All JVM parameters can be passed using KAFKA_OPTS environment variable. An example in your Dockerfile would be:

ENV KAFKA_OPTS="-XX:+UseStringDeduplication"

How to solve offset flush timeouts?

When a log contains errors like Failed to flush, timed out while waiting for producer to flush outstanding 218630 messages it means that Kafka Connect is not able to record offsets into offset topic fast enough.

There can be multiple solutions and root causes of the problem

  • Kafka option acks is set to all and one of the replica brokers is slow with processing the writes

  • Connect records are generated very fast, Kafka Connect options offset.flush.interval.ms and offset.flush.timeout.ms should be tuned. The interval should be shortened and timeout increased.

  • Debezium is generating very large batches of records, reduce parameters max.batch.size and max.queue.size

Why don’t I see DELETE events in some cases?

This may be caused by the usage of CASCADE DELETE statements. In this case the deletion events generated by the database are not part of the binlog and thus cannot be captured by Debezium.

Why Debezium MySQL connector fails to consume data from a RDS MySQL read replica?

Debezium MySQL requires enabling the server binlog. In the case of RDS MySQL, the log_bin property is managed directly by AWS and is set to OFF by default. When Debezium MySQL executes the SHOW MASTER STATUS command during a snapshot, the result set is empty and an exception is thrown:

Caused by: java.lang.IllegalStateException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured
    at io.debezium.connector.mysql.SnapshotReader.lambda$readBinlogPosition$16(SnapshotReader.java:761)
    at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:444)
    at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:385)
    at io.debezium.connector.mysql.SnapshotReader.readBinlogPosition(SnapshotReader.java:745)
    at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:370)

The solution is to indirectly enable the log_bin property, activating certain product features in RDS MySQL: read replicas and/or automated backups. Upon activating any of them, the bin_log property value will change to ON automatically and the connector will be able to complete snapshots successfully.

Why Debezium PostgreSQL connector causes abnormal consumption of WAL database disk space?

See WAL Disk Space Consumption in PostgreSQL connector documentation.

Why is no new schema version created when the content of two columns in a table is swapped?

If two columns in a table are swapped in that way that after the change the table schema is same as before, then no new version of the schema gets created in the schema registry. An example of such operation could be:

  • original table - id, c1, c2 where c1 and c2 are of the same type

  • column swap - id, c2, c1

  • column rename - id, c1, c2

The schema registry creates a new version of schema only if the schema logically changes, but in this case the schema is the same after the change for an external observer.

How to enlarge the maximum size of the message delivered to Kafka?

For large transactions it is possible that Kafka Connect emits message that is larger then the pre-set maximum. The log usually contains an exception similar to:

org.apache.kafka.common.errors.RecordTooLargeException: The message is 1740572 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

To solve the issue the configuration option producer.max.request.size must be set in Kafka Connect worker config file connect-distributed.properties. If the global change is not desirable then the connector can override the default setting using configuration option producer.override.max.request.size set to a larger value.

In the latter case it is also necessary to configure connector.client.config.override.policy=ALL option in Kafka Connect worker config file connect-distributed.properties. For Debezium connect Docker image the environment variable CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY can be used to configure the option.

Why do JSON messages not contain schema?

If you are using JsonConverter to convert and serialize the messages emitted by Debezium, the schema is not included in the messages by default. Schemas are enabled using the schemas.enable converter configuration parameter, set either at worker level (e.g. connect-distibuted.properties):

key.converter.schemas.enable=true
value.converter.schemas.enable=true

or at the connector level, depending on where the converter is configured.

How to enable case insensitive name filtering?

Several configuration options like table.include.list that define a set of regular expressions are not case sensitive when applied to identifiers.

If your environment requires case-sensitive matching (e.g. two identifiers are differing in letter-case only), then you can use the regular expression flag (?-i) for a given expression to mandate case-sensitive matching.

Why do change event values for MySQL TIMESTAMP columns differ between snapshotting and streaming?

This may be the case if Debezium (or rather the MySQL JDBC driver) cannot retrieve the database’s timezone for some reason. In this case, TIMESTAMP values may fail to be normalized to UTC. The database timezone must be specified explicitly in this situation, using the database.connectionTimeZone pass-through connector option (database.serverTimezone must be used for Debezium versions older than 1.7).

MongoDB Connector fails with maximum BSON size error

This may be the case when Debezium (or rather the MongoDB change stream cursor) encounters a change event document with total size exceeding the BSON document size limit of 16 megabytes. Note that depending on the used capture.mode this issue can still manifest even when the actual value of the stored documents is significantly lower.

Starting with Debezium version 2.3.4 you can use the cursor.pipeline and cursor.pipeline.order properties to circumvent this error

First set the cursor.pipeline.order to user_first, then set cursor.pipeline to add the following aggregation stage.

[{ "$match": { "$and": [{"$expr": { "$lte": [{"$bsonSize": "$fullDocument"}, 8000]}}, {"$expr": { "$lte": [{"$bsonSize": "$fullDocumentBeforeChange"}, 8000]}}]} }]

In the example above all events for documents with original or updated size exceeeding 8000 bytes will be safely filtered out.