Storing state of a Debezium connector
Overview
Debezium connectors require persistent storage to preserve their state between restarts. All connectors require a mechanism to provide persistent storage for the offsets. In addition, connectors such as Db2, MySQL, Oracle, and SQL Server, require additional storage for their so-called internal schema history, which records changes to table schema in the database.
Offset storage is provided automatically for deployments within the Kafka Connect runtime through one of the following mechanisms:
Kafka offset store |
Provides storage for Kafka Connect distributed. |
File offset store |
Provides storage for Kafka Connect standalone. |
If you run a connector in Debezium Engine or Debezium Server, you must explicitly configure the offset store. For connectors that work with schema-based databases, you configure internal schema history storage by setting connector properties.
Kafka
Debezium can use Kafka to store its state, including source offsets and schema history.
Connectors implement KafkaOffsetBackingStore
to store offsets in a Kafka topic (for example, connect-offsets
).
These offsets ensure that after a connector restarts, it is able to resume reading from the correct position.
Connectors store their schema history in a separate compacted topic, such as schema-changes.inventory
.
Offset Store
Property | Default | Description |
---|---|---|
No default |
Must be set to |
|
No default |
Specifies the Kafka topic where the connector stores its offsets. To ensure that the topic retains the latest offset information, you must enable log compaction for this topic. |
|
25 |
Specifies the number of partitions for the offset storage topic. Ensure that the value of this setting aligns with the partitioning strategy of the Kafka cluster. |
|
3 |
Sets the replication factor for the offset storage topic. Replicating data across multiple brokers improves fault tolerance. |
Internal schema history store
Property | Default | Description |
---|---|---|
No default |
Must be set to |
|
No default |
The name of the topic that stores the database schema history. |
|
No default |
A list of host and port pairs that the connector uses to establish the initial connection to the Kafka cluster to retrieve its database schema history. This value must match the connection settings that the Kafka Connect process uses to connect to the Kafka cluster. |
|
100 |
Specifies the time, in milliseconds, that the connector waits between polling requests for persisted data during recovery. |
|
100 |
Specifies the number of consecutive unsuccessful attempts to retrieve schema history data from Kafka that the connector allows.
Recovery attempts stop after the number of attempts exceeds this value.
The maximum time that the connector waits after it is unable to retrieve data is |
|
3 |
Specifies the time, in milliseconds, that the connector waits for a response after the Kafka AdminClient submits a request to fetch cluster information before the request times out. |
|
30 |
Specifies the time, in milliseconds, that the connector waits for a response after the Kafka AdminClient submits a request to create a Kafka history topic before the request times out. |
|
No Default |
Pass-through properties prefix for configuring how producer clients interact with schema history topics. |
|
No Default |
Pass-through properties prefix for configuring how consumer clients interact with schema history topics. |
File
It’s possible to persist the position (offsets) of the connector in a local file on disk. These offsets ensure that after a connector restarts, Debezium can resume reading from the last read position. File storage provides a simple, fast mechanism for storing offsets that is ideal for single-node applications or testing scenarios.
Offset Store
Property | Default | Description |
---|---|---|
No default |
Must be set to |
|
No default |
The path to the file where Debezium stores source connector offsets. |
|
6000ms |
Specifies the time, in milliseconds, between attempts to flush the current offset state to the configured offsets file. |
Memory
MemoryOffsetBackingStore
is a volatile, in-memory store that Debezium Embedded uses to track source offsets.
Storing offsets in memory preserves the offset state only during the application’s runtime.
Offset records are lost if the connector shuts down or crashes.
Memory storage is ideal for testing, or for short-lived tasks, but it is not suitable for production environments that require persistent offsets.
Offset Store
Property | Default | Description |
---|---|---|
No default |
Must be set to |
JDBC
The store uses an arbitrary relational database to store the offset data. You must provide the JDBC driver for the database. Debezium can store data in the same source database from which it captures events, or you can configure it to use a different database.
Debezium provides pre-configured DML and DDL statements. You can use these default statements, or you can override the defaults with your own statements to provide compatibility with database dialects or to customize them for specific use cases.
Offset Store
Property | Default | Description |
---|---|---|
No default |
Must be set to |
|
No default |
JDBC driver connection string to connect to the database. |
|
No default |
(Optional) The username through which Debezium connects to the database that stores offset data. |
|
No default |
(Optional) Password for the user specified by offset.storage.jdbc.connection.user. |
|
3 seconds |
(Optional) Specifies the time, in milliseconds, that the connector waits to retry the connection after failed attempts to connect to the offset storage database. |
|
5 |
(Optional) Specifies the maximum number of times that Debezium retries the connection to the offset storage database after a connection failure. |
|
|
The name of the table where Debezium stores offsets. |
|
DDL statement to create the offset table. |
||
DML statement that Debezium uses to read offsets values from the table. |
||
DML statement that Debezium uses to write offsets to the table. |
||
DML statement that Debezium uses to remove offsets from the table. |
Deprecated configuration prior to 3.2
Property | Default | Description |
---|---|---|
No default |
Must be set to |
|
No default |
JDBC driver connection string to connect to the database. |
|
No default |
(Optional) The username through which Debezium connects to the database that stores offset data. |
|
No default |
(Optional) Password for the user specified by |
|
3 seconds |
(Optional) Specifies the time, in milliseconds, that the connector waits to retry the connection after failed attempts to connect to the offset storage database. |
|
5 |
(Optional) Specifies the maximum number of times that Debezium retries the connection to the offset storage database after a connection failure. |
|
|
The name of the table where Debezium stores offsets. |
|
DDL statement to create the offset table. |
||
DML statement to read offsets stored from the table. |
||
DML statement to write offsets to the table. |
||
DML statement to remove offsets from the table. |
Offset Table defaults
CREATE TABLE %s ( id VARCHAR(36) NOT NULL, offset_key VARCHAR(1255), offset_val VARCHAR(1255), record_insert_ts TIMESTAMP NOT NULL, record_insert_seq INTEGER NOT NULL)
SELECT id, offset_key, offset_val FROM %s ORDER BY record_insert_ts, record_insert_seq
INSERT INTO %s(id, offset_key, offset_val, record_insert_ts, record_insert_seq) VALUES ( ?, ?, ?, ?, ? )
DELETE FROM %s
Internal schema history store
Property | Default | Description |
---|---|---|
No default |
Must be set to |
|
No default |
JDBC driver connection string to connect to the database. |
|
No default |
(Optional) The username through which Debezium connects to the database that stores schema history data. |
|
No default |
(Optional) Password for the user specified by |
|
3 seconds |
(Optional) Specifies the time, in milliseconds, that the connector waits to retry the connection after an attempt to connect to the internal schema history database fails. |
|
5 |
(Optional) Specifies the maximum number of times that Debezium retries the connection to the internal schema history database after a connection failure. |
|
|
The name of the table where Debezium stores the internal schema history. |
|
The DDL statement used to create a table to store the internal schema history. |
||
The |
||
The |
||
The |
Deprecated configuration prior to 3.2
Property | Default | Description |
---|---|---|
No default |
Must be set to |
|
No default |
JDBC driver connection string to connect to the database. |
|
No default |
(Optional) The username through which Debezium connects to the database that stores internal schema history data. |
|
No default |
(Optional) Password for the user specified by |
|
3 seconds |
(Optional) Specifies the time, in milliseconds, that the connector waits to retry the connection after an attempt to connect to the internal schema history database fails. |
|
5 |
(Optional) Specifies the maximum number of times that Debezium retries the connection to the internal schema history database after a connection failure. |
|
|
The name of the table where Debezium stores the internal schema history. |
|
The DDL statement used to create the internal schema history storage table. |
||
The |
||
The |
||
The |
History Table defaults
CREATE TABLE %s ( id VARCHAR(36) NOT NULL, history_data VARCHAR(65000), history_data_seq INTEGER, record_insert_ts TIMESTAMP NOT NULL, record_insert_seq INTEGER NOT NULL )
SELECT id, history_data, history_data_seq FROM %s ORDER BY record_insert_ts, record_insert_seq, id, history_data_seq
SELECT * FROM %s LIMIT 1
INSERT INTO %s(id, history_data, history_data_seq, record_insert_ts, record_insert_seq) VALUES ( ?, ?, ?, ?, ? )
Redis
Debezium can use a Jedis client to store data in a Redis cache.
Offset Store
Property | Default | Description |
---|---|---|
No default |
Must be set to |
|
metadata:debezium:offsets |
The Redis key that Debezium uses to store offsets. |
|
No default |
The URL through which Debezium connects to Redis to store offset data. |
|
No default |
The user account through which Debezium connects to Redis to store offset data. |
|
No default |
The password for the user account through which Debezium connects to Redis to store offset data. |
|
0 |
The database index (0..15) that Debezium uses to access Redis to store offset data.. |
|
false |
Specifies whether Debezium uses SSL when communicating with Redis to store offset data. |
|
2000 |
Specifies the time, in milliseconds, that Debezium waits to establish a connection to Redis before the connection times out. |
|
2000 |
Specifies the interval, in milliseconds, that Debezium allows for exchanging offset data with Redis before the socket times out. If a data packet is not transferred with the specified interval, Debezium closes the socket. |
|
300 |
Specifies the time, in milliseconds, that Debezium waits to retry the connection after an initial attempt to connect to Redis fails. |
|
10000 |
Specifies the maximum time, in milliseconds, that Debezium waits to retry the connection after an attempt to connect to Redis fails. |
|
10 |
Specifies the maximum number of times that Debezium retries the connection to Redis after connection attempts fail. |
|
false |
In Redis environments that are configured to use a replica shard, specifies whether Debezium waits for Redis to verify that it wrote data to the replica. |
|
1000 |
Specifies a time, in milliseconds, that Debezium waits for confirmation that Redis wrote data to a replica shard before the request times out. |
|
false |
Specifies whether Debezium retries failed requests to confirm whether data is written to a replica shard. |
|
1000 |
Specifies the time, in milliseconds, that Debezium waits after a failure before it resubmits a request to Redis to confirm data is written to a replica shard. |
Internal schema history store
Property | Default | Description |
---|---|---|
No default |
Must be set to |
|
metadata:debezium:schema_history |
The Redis key that Debezium uses to store the schema history data. |
|
No default |
The URL through which Debezium connects to Redis to store schema history data. |
|
No default |
The user account through which Debezium connects to Redis to store schema history data. |
|
No default |
The password for the user account through which Debezium connects to Redis to store schema history data. |
|
0 |
The database index (0..15) that Debezium uses to access Redis to store schema history data. |
|
false |
Specifies whether Debezium uses SSL when communicating with Redis to store schema history data. |
|
2000 |
Specifies the time, in milliseconds, that Debezium waits to establish a connection to Redis before the connection times out. |
|
2000 |
Specifies the interval, in milliseconds, that Debezium allows for exchanging schema history data with Redis. If a data packet is not transferred with the specified interval, Debezium closes the socket. |
|
300 |
Specifies the time, in milliseconds, that Debezium waits to retry the connection after an initial attempt to connect to Redis fails. |
|
10000 |
Specifies the maximum time, in milliseconds, that Debezium waits to retry the connection after an attempt to connect to Redis fails. |
|
10 Specifies the maximum number of times that Debezium retries the connection to Redis after connection attempts fail. |
||
false |
In Redis environments that are configured to use a replica shard, specifies whether Debezium waits for Redis to verify that it wrote data to the replica. |
|
1000 |
Specifies the time, in milliseconds, that Debezium waits for confirmation that Redis wrote data to a replica shard before the request times out. |
|
false |
Specifies whether Debezium retries failed requests to confirm whether data is written to a replica shard. |
Amazon S3
Debezium can use the Amazon S3 object storage service. Typically, you would use S3 storage when you deploy Debezium with Amazon Managed Streaming for Apache Kafka (Amazon MSK).
Internal schema history store
Property | Default | Description |
---|---|---|
No default |
Must be set to |
|
No default |
(Optional) The identifier of the static access key that Debezium uses to authenticate with S3. |
|
No default |
(Optional) The Amazon Web Services (AWS) secret key that Debezium uses to authenticate to S3. |
|
No default |
(Optional) Specifies the name of the region that hosts the S3 bucket. |
|
No default |
Specifies the name of the S3 bucket that stores the schema history. |
|
No default |
Specifies the object name in the bucket that stores the schema history. |
|
No default |
(Optional) Specifies a custom URL that Debezium uses to access the S3 service. |
Azure Blob Storage
Debezium can use the Azure Blob storage service to store data. Typically, you would use Azure Blob storage when you deploy Debezium in the Apache Kafka in Azure HDInsight service.
Internal schema history store
Property | Default | Description |
---|---|---|
No default |
Must be set to |
|
No default |
Specifies the Azure Blob storage connection string. |
|
No default |
The name of the account that Debezium uses to connect to Azure. |
|
No default |
The name of the Azure container in which Debezium stores data. |
|
No default |
The name of the blob where Debezium stores data. |
RocketMQ
Debezium can use the RocketMqSchemaHistory
class to store and retrieve database schema changes in Apache RocketMQ.
Internal schema history store
Property | Default | Description |
---|---|---|
No default |
Must be set to |
|
No Default |
The name of the RocketMQ topic where Debezium stores the database schema history. |
|
No Default |
Specifies the host and port where the Apache RocketMQ NameServer discovery service is available. |
|
false |
Specifies whether to enable access control lists in RocketMQ. |
|
No Default |
Specifies the RocketMQ access key.
This field must include a value if |
|
No Default |
Specifies the RocketMQ secret key.
This field must include a value if |
|
No Default |
Specifies the number of consecutive attempts in which RocketMQ returns no data before recovery completes. |
|
No Default |
Specifies the time, in milliseconds, that Debezium waits after each poll attempt to recover the history. |
|
No Default |
Specifies the time, in milliseconds, that Debezium waits for a write to Rocket MQ to complete before the operation times out. |