You are viewing documentation for an unreleased version of Debezium.
If you want to view the latest stable version of this page, please go here.

Storing state of a Debezium connector

Overview

Debezium connectors require persistent storage to preserve their state between restarts. All connectors require a mechanism to provide persistent storage for the offsets. In addition, connectors such as Db2, MySQL, Oracle, and SQL Server, require additional storage for their so-called internal schema history, which records changes to table schema in the database.

Offset storage is provided automatically for deployments within the Kafka Connect runtime through one of the following mechanisms:

Kafka offset store

Provides storage for Kafka Connect distributed.

File offset store

Provides storage for Kafka Connect standalone.

If you run a connector in Debezium Engine or Debezium Server, you must explicitly configure the offset store. For connectors that work with schema-based databases, you configure internal schema history storage by setting connector properties.

Kafka

Debezium can use Kafka to store its state, including source offsets and schema history. Connectors implement KafkaOffsetBackingStore to store offsets in a Kafka topic (for example, connect-offsets). These offsets ensure that after a connector restarts, it is able to resume reading from the correct position. Connectors store their schema history in a separate compacted topic, such as schema-changes.inventory.

Offset Store

Property Default Description

No default

Must be set to org.apache.kafka.connect.storage.KafkaOffsetBackingStore.

No default

Specifies the Kafka topic where the connector stores its offsets. To ensure that the topic retains the latest offset information, you must enable log compaction for this topic.

25

Specifies the number of partitions for the offset storage topic. Ensure that the value of this setting aligns with the partitioning strategy of the Kafka cluster.

3

Sets the replication factor for the offset storage topic. Replicating data across multiple brokers improves fault tolerance.

Internal schema history store

Property Default Description

No default

Must be set to io.debezium.storage.kafka.history.KafkaSchemaHistory.

No default

The name of the topic that stores the database schema history.

No default

A list of host and port pairs that the connector uses to establish the initial connection to the Kafka cluster to retrieve its database schema history. This value must match the connection settings that the Kafka Connect process uses to connect to the Kafka cluster.

100

Specifies the time, in milliseconds, that the connector waits between polling requests for persisted data during recovery.

100

Specifies the number of consecutive unsuccessful attempts to retrieve schema history data from Kafka that the connector allows. Recovery attempts stop after the number of attempts exceeds this value. The maximum time that the connector waits after it is unable to retrieve data is recovery.attempts x recovery.poll.interval.ms.

3

Specifies the time, in milliseconds, that the connector waits for a response after the Kafka AdminClient submits a request to fetch cluster information before the request times out.

30

Specifies the time, in milliseconds, that the connector waits for a response after the Kafka AdminClient submits a request to create a Kafka history topic before the request times out.

No Default

Pass-through properties prefix for configuring how producer clients interact with schema history topics.

No Default

Pass-through properties prefix for configuring how consumer clients interact with schema history topics.

File

It’s possible to persist the position (offsets) of the connector in a local file on disk. These offsets ensure that after a connector restarts, Debezium can resume reading from the last read position. File storage provides a simple, fast mechanism for storing offsets that is ideal for single-node applications or testing scenarios.

Offset Store

Property Default Description

No default

Must be set to org.apache.kafka.connect.storage.FileOffsetBackingStore

No default

The path to the file where Debezium stores source connector offsets.

6000ms

Specifies the time, in milliseconds, between attempts to flush the current offset state to the configured offsets file.

Internal schema history store

Property Default Description

No default

Must be set to io.debezium.storage.file.history.FileSchemaHistory

No default

The path to the file where Debezium records the database schema history.

Memory

MemoryOffsetBackingStore is a volatile, in-memory store that Debezium Embedded uses to track source offsets. Storing offsets in memory preserves the offset state only during the application’s runtime. Offset records are lost if the connector shuts down or crashes. Memory storage is ideal for testing, or for short-lived tasks, but it is not suitable for production environments that require persistent offsets.

Offset Store

Property Default Description

No default

Must be set to org.apache.kafka.connect.storage.MemoryOffsetBackingStore

Internal schema history store

Property Default Description

No default

Must be set to io.debezium.relational.history.MemorySchemaHistory

JDBC

The store uses an arbitrary relational database to store the offset data. You must provide the JDBC driver for the database. Debezium can store data in the same source database from which it captures events, or you can configure it to use a different database.

Debezium provides pre-configured DML and DDL statements. You can use these default statements, or you can override the defaults with your own statements to provide compatibility with database dialects or to customize them for specific use cases.

Offset Store

Property Default Description

No default

Must be set to io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore.

No default

JDBC driver connection string to connect to the database.

No default

(Optional) The username through which Debezium connects to the database that stores offset data.

No default

(Optional) Password for the user specified by offset.storage.jdbc.connection.user.

3 seconds

(Optional) Specifies the time, in milliseconds, that the connector waits to retry the connection after failed attempts to connect to the offset storage database.

5

(Optional) Specifies the maximum number of times that Debezium retries the connection to the offset storage database after a connection failure.

debezium_offset_storage

The name of the table where Debezium stores offsets.

DDL statement to create the offset table.

DML statement that Debezium uses to read offsets values from the table.

DML statement that Debezium uses to write offsets to the table.

DML statement that Debezium uses to remove offsets from the table.

Deprecated configuration prior to 3.2

Property Default Description

No default

Must be set to io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore.

No default

JDBC driver connection string to connect to the database.

No default

(Optional) The username through which Debezium connects to the database that stores offset data.

No default

(Optional) Password for the user specified by offset.storage.jdbc.user.

3 seconds

(Optional) Specifies the time, in milliseconds, that the connector waits to retry the connection after failed attempts to connect to the offset storage database.

5

(Optional) Specifies the maximum number of times that Debezium retries the connection to the offset storage database after a connection failure.

debezium_offset_storage

The name of the table where Debezium stores offsets.

DDL statement to create the offset table.

DML statement to read offsets stored from the table.

DML statement to write offsets to the table.

DML statement to remove offsets from the table.

Offset Table defaults

CREATE TABLE %s (
id VARCHAR(36)      NOT NULL,
offset_key          VARCHAR(1255),
offset_val          VARCHAR(1255),
record_insert_ts    TIMESTAMP NOT NULL,
record_insert_seq   INTEGER NOT NULL)
SELECT id, offset_key, offset_val FROM %s ORDER BY record_insert_ts, record_insert_seq
INSERT INTO %s(id, offset_key, offset_val, record_insert_ts, record_insert_seq)
    VALUES ( ?, ?, ?, ?, ? )
DELETE FROM %s

Internal schema history store

Property Default Description

No default

Must be set to io.debezium.storage.jdbc.history.JdbcSchemaHistory.

No default

JDBC driver connection string to connect to the database.

No default

(Optional) The username through which Debezium connects to the database that stores schema history data.

No default

(Optional) Password for the user specified by schema.history.internal.jdbc.connection.user.

3 seconds

(Optional) Specifies the time, in milliseconds, that the connector waits to retry the connection after an attempt to connect to the internal schema history database fails.

5

(Optional) Specifies the maximum number of times that Debezium retries the connection to the internal schema history database after a connection failure.

debezium_database_history

The name of the table where Debezium stores the internal schema history.

The DDL statement used to create a table to store the internal schema history.

The SELECT statement to read schema changes from the internal schema history table.

The SELECT statement that checks for the existence of an internal schema history storage table.

The INSERT statement that records changes to the internal schema history table.

Deprecated configuration prior to 3.2

Property Default Description

No default

Must be set to io.debezium.storage.jdbc.history.JdbcSchemaHistory.

No default

JDBC driver connection string to connect to the database.

No default

(Optional) The username through which Debezium connects to the database that stores internal schema history data.

No default

(Optional) Password for the user specified by schema.history.internal.jdbc.user.

3 seconds

(Optional) Specifies the time, in milliseconds, that the connector waits to retry the connection after an attempt to connect to the internal schema history database fails.

5

(Optional) Specifies the maximum number of times that Debezium retries the connection to the internal schema history database after a connection failure.

debezium_database_history

The name of the table where Debezium stores the internal schema history.

The DDL statement used to create the internal schema history storage table.

The SELECT statement to read schema changes from the internal schema history table.

The SELECT statement that checks for the existence of an internal schema history storage table.

The INSERT statement that records changes to the internal schema history table.

History Table defaults

CREATE TABLE %s (
    id VARCHAR(36) NOT NULL,
    history_data VARCHAR(65000),
    history_data_seq INTEGER,
    record_insert_ts TIMESTAMP NOT NULL,
    record_insert_seq INTEGER NOT NULL
)
SELECT id, history_data, history_data_seq FROM %s
    ORDER BY record_insert_ts, record_insert_seq, id, history_data_seq
SELECT * FROM %s LIMIT 1
INSERT INTO %s(id, history_data, history_data_seq, record_insert_ts, record_insert_seq) VALUES ( ?, ?, ?, ?, ? )

Redis

Debezium can use a Jedis client to store data in a Redis cache.

Offset Store

Property Default Description

No default

Must be set to io.debezium.storage.redis.offset.RedisOffsetBackingStore

metadata:debezium:offsets

The Redis key that Debezium uses to store offsets.

No default

The URL through which Debezium connects to Redis to store offset data.

No default

The user account through which Debezium connects to Redis to store offset data.

No default

The password for the user account through which Debezium connects to Redis to store offset data.

0

The database index (0..15) that Debezium uses to access Redis to store offset data..

false

Specifies whether Debezium uses SSL when communicating with Redis to store offset data.

2000

Specifies the time, in milliseconds, that Debezium waits to establish a connection to Redis before the connection times out.

2000

Specifies the interval, in milliseconds, that Debezium allows for exchanging offset data with Redis before the socket times out. If a data packet is not transferred with the specified interval, Debezium closes the socket.

300

Specifies the time, in milliseconds, that Debezium waits to retry the connection after an initial attempt to connect to Redis fails.

10000

Specifies the maximum time, in milliseconds, that Debezium waits to retry the connection after an attempt to connect to Redis fails.

10

Specifies the maximum number of times that Debezium retries the connection to Redis after connection attempts fail.

false

In Redis environments that are configured to use a replica shard, specifies whether Debezium waits for Redis to verify that it wrote data to the replica.

1000

Specifies a time, in milliseconds, that Debezium waits for confirmation that Redis wrote data to a replica shard before the request times out.

false

Specifies whether Debezium retries failed requests to confirm whether data is written to a replica shard.

1000

Specifies the time, in milliseconds, that Debezium waits after a failure before it resubmits a request to Redis to confirm data is written to a replica shard.

Internal schema history store

Property Default Description

No default

Must be set to io.debezium.storage.redis.history.RedisSchemaHistory

metadata:debezium:schema_history

The Redis key that Debezium uses to store the schema history data.

No default

The URL through which Debezium connects to Redis to store schema history data.

No default

The user account through which Debezium connects to Redis to store schema history data.

No default

The password for the user account through which Debezium connects to Redis to store schema history data.

0

The database index (0..15) that Debezium uses to access Redis to store schema history data.

false

Specifies whether Debezium uses SSL when communicating with Redis to store schema history data.

2000

Specifies the time, in milliseconds, that Debezium waits to establish a connection to Redis before the connection times out.

2000

Specifies the interval, in milliseconds, that Debezium allows for exchanging schema history data with Redis. If a data packet is not transferred with the specified interval, Debezium closes the socket.

300

Specifies the time, in milliseconds, that Debezium waits to retry the connection after an initial attempt to connect to Redis fails.

10000

Specifies the maximum time, in milliseconds, that Debezium waits to retry the connection after an attempt to connect to Redis fails.

10 Specifies the maximum number of times that Debezium retries the connection to Redis after connection attempts fail.

false

In Redis environments that are configured to use a replica shard, specifies whether Debezium waits for Redis to verify that it wrote data to the replica.

1000

Specifies the time, in milliseconds, that Debezium waits for confirmation that Redis wrote data to a replica shard before the request times out.

false

Specifies whether Debezium retries failed requests to confirm whether data is written to a replica shard.

Amazon S3

Debezium can use the Amazon S3 object storage service. Typically, you would use S3 storage when you deploy Debezium with Amazon Managed Streaming for Apache Kafka (Amazon MSK).

Internal schema history store

Property Default Description

No default

Must be set to io.debezium.storage.s3.history.S3SchemaHistory.

No default

(Optional) The identifier of the static access key that Debezium uses to authenticate with S3.

No default

(Optional) The Amazon Web Services (AWS) secret key that Debezium uses to authenticate to S3.

No default

(Optional) Specifies the name of the region that hosts the S3 bucket.

No default

Specifies the name of the S3 bucket that stores the schema history.

No default

Specifies the object name in the bucket that stores the schema history.

No default

(Optional) Specifies a custom URL that Debezium uses to access the S3 service.
Provide the URL in the following format: http://<server>:<port>;

Azure Blob Storage

Debezium can use the Azure Blob storage service to store data. Typically, you would use Azure Blob storage when you deploy Debezium in the Apache Kafka in Azure HDInsight service.

Internal schema history store

Property Default Description

No default

Must be set to io.debezium.storage.azure.blob.history.AzureBlobSchemaHistory.

No default

Specifies the Azure Blob storage connection string.

No default

The name of the account that Debezium uses to connect to Azure.

No default

The name of the Azure container in which Debezium stores data.

No default

The name of the blob where Debezium stores data.

RocketMQ

Debezium can use the RocketMqSchemaHistory class to store and retrieve database schema changes in Apache RocketMQ.

Internal schema history store

Property Default Description

No default

Must be set to io.debezium.storage.rocketmq.history.RocketMqSchemaHistory.

No Default

The name of the RocketMQ topic where Debezium stores the database schema history.

No Default

Specifies the host and port where the Apache RocketMQ NameServer discovery service is available.

false

Specifies whether to enable access control lists in RocketMQ.

No Default

Specifies the RocketMQ access key. This field must include a value if schema.history.internal.rocketmq.acl.enabled is set to true.

No Default

Specifies the RocketMQ secret key. This field must include a value if schema.history.internal.rocketmq.acl.enabled is set to true.

No Default

Specifies the number of consecutive attempts in which RocketMQ returns no data before recovery completes.

No Default

Specifies the time, in milliseconds, that Debezium waits after each poll attempt to recover the history.

No Default

Specifies the time, in milliseconds, that Debezium waits for a write to Rocket MQ to complete before the operation times out.