Debezium connector for PostgreSQL
- Overview
- How the connector works
- Data change events
- Data type mappings
- Custom converters
- Setting up Postgres
- PostgreSQL in the Cloud
- Installing the logical decoding output plug-in
- Plug-in differences
- Configuring the PostgreSQL server
- Setting up permissions
- Setting privileges to enable Debezium to create PostgreSQL publications when you use
pgoutput
- Configuring PostgreSQL to allow replication with the Debezium connector host
- Supported PostgreSQL topologies
- WAL disk space consumption
- Setting up multiple connectors for same database server
- Upgrading PostgreSQL
- Deployment
- Monitoring
- Behavior when things go wrong
The Debezium PostgreSQL connector captures row-level changes in the schemas of a PostgreSQL database. For information about the PostgreSQL versions that are compatible with the connector, see the Debezium release overview.
The first time it connects to a PostgreSQL server or cluster, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content and that were committed to a PostgreSQL database. The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.
Overview
PostgreSQL’s logical decoding feature was introduced in version 9.4. It is a mechanism that allows the extraction of the changes that were committed to the transaction log and the processing of these changes in a user-friendly manner with the help of an output plug-in. The output plug-in enables clients to consume the changes.
The PostgreSQL connector contains two main parts that work together to read and process database changes:
-
A logical decoding output plug-in. You might need to install the output plug-in that you choose to use. You must configure a replication slot that uses your chosen output plug-in before running the PostgreSQL server. The plug-in can be one of the following:
-
decoderbufs
is based on Protobuf and maintained by the Debezium community. -
pgoutput
is the standard logical decoding output plug-in in PostgreSQL 10+. It is maintained by the PostgreSQL community, and used by PostgreSQL itself for logical replication. This plug-in is always present so no additional libraries need to be installed. The Debezium connector interprets the raw replication event stream directly into change events.
-
-
Java code (the actual Kafka Connect connector) that reads the changes produced by the chosen logical decoding output plug-in. It uses PostgreSQL’s streaming replication protocol, by means of the PostgreSQL JDBC driver
The connector produces a change event for every row-level insert, update, and delete operation that was captured and sends change event records for each table in a separate Kafka topic. Client applications read the Kafka topics that correspond to the database tables of interest, and can react to every row-level event they receive from those topics.
PostgreSQL normally purges write-ahead log (WAL) segments after some period of time. This means that the connector does not have the complete history of all changes that have been made to the database. Therefore, when the PostgreSQL connector first connects to a particular PostgreSQL database, it starts by performing a consistent snapshot of each of the database schemas. After the connector completes the snapshot, it continues streaming changes from the exact point at which the snapshot was made. This way, the connector starts with a consistent view of all of the data, and does not omit any changes that were made while the snapshot was being taken.
The connector is tolerant of failures. As the connector reads changes and produces events, it records the WAL position for each event. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart the connector continues reading the WAL where it last left off. This includes snapshots. If the connector stops during a snapshot, the connector begins a new snapshot when it restarts.
The connector relies on and reflects the PostgreSQL logical decoding feature, which has the following limitations:
Additionally, the Behavior when things go wrong describes how the connector responds if there is a problem. |
Debezium currently supports databases with UTF-8 character encoding only. With a single byte character encoding, it is not possible to correctly process strings that contain extended ASCII code characters. |
How the connector works
To optimally configure and run a Debezium PostgreSQL connector, it is helpful to understand how the connector performs snapshots, streams change events, determines Kafka topic names, and uses metadata.
Security
To use the Debezium connector to stream changes from a PostgreSQL database, the connector must operate with specific privileges in the database.
Although one way to grant the necessary privileges is to provide the user with superuser
privileges, doing so potentially exposes your PostgreSQL data to unauthorized access.
Rather than granting excessive privileges to the Debezium user, it is best to create a dedicated Debezium replication user to which you grant specific privileges.
For more information about configuring privileges for the Debezium PostgreSQL user, see Setting up permissions. For more information about PostgreSQL logical replication security, see the PostgreSQL documentation.
Snapshots
Most PostgreSQL servers are configured to not retain the complete history of the database in the WAL segments. This means that the PostgreSQL connector would be unable to see the entire history of the database by reading only the WAL. Consequently, the first time that the connector starts, it performs an initial consistent snapshot of the database.
The default behavior for performing a snapshot consists of the following steps.
You can change this behavior by setting the snapshot.mode
connector configuration property to a value other than initial
.
-
Start a transaction with a SERIALIZABLE, READ ONLY, DEFERRABLE isolation level to ensure that subsequent reads in this transaction are against a single consistent version of the data. Any changes to the data due to subsequent
INSERT
,UPDATE
, andDELETE
operations by other clients are not visible to this transaction. -
Read the current position in the server’s transaction log.
-
Scan the database tables and schemas, generate a
READ
event for each row and write that event to the appropriate table-specific Kafka topic. -
Commit the transaction.
-
Record the successful completion of the snapshot in the connector offsets.
If the connector fails, is rebalanced, or stops after Step 1 begins but before Step 5 completes, upon restart the connector begins a new snapshot. After the connector completes its initial snapshot, the PostgreSQL connector continues streaming from the position that it read in Step 2. This ensures that the connector does not miss any updates. If the connector stops again for any reason, upon restart, the connector continues streaming changes from where it previously left off.
Option | Description |
---|---|
|
The connector always performs a snapshot when it starts. After the snapshot completes, the connector continues streaming changes from step 3 in the above sequence. This mode is useful in these situations:
|
|
The connector performs a database snapshot when no Kafka offsets topic exists. After the database snapshot completes the Kafka offsets topic is written. If there is a previously stored LSN in the Kafka offsets topic, the connector continues streaming changes from that position. |
|
The connector performs a database snapshot and stops before streaming any change event records. If the connector had started but did not complete a snapshot before stopping, the connector restarts the snapshot process and stops when the snapshot completes. |
|
The connector never performs snapshots. When a connector is configured this way, after it starts, it behaves as follows: If there is a previously stored LSN in the Kafka offsets topic, the connector continues streaming changes from that position. If no LSN is stored, the connector starts streaming changes from the point at which the PostgreSQL logical replication slot was created on the server. Use this snapshot mode only when you know that all data of interest is still reflected in the WAL. |
|
Deprecated, see |
|
After the connector starts, it performs a snapshot only if it detects one of the following circumstances:
|
|
Set the snapshot mode to |
|
The |
Ad hoc snapshots
By default, a connector runs an initial snapshot operation only after it starts for the first time. Following this initial snapshot, under normal circumstances, the connector does not repeat the snapshot process. Any future change event data that the connector captures comes in through the streaming process only.
However, in some situations the data that the connector obtained during the initial snapshot might become stale, lost, or incomplete. To provide a mechanism for recapturing table data, Debezium includes an option to perform ad hoc snapshots. You might want to perform an ad hoc snapshot after any of the following changes occur in your Debezium environment:
-
The connector configuration is modified to capture a different set of tables.
-
Kafka topics are deleted and must be rebuilt.
-
Data corruption occurs due to a configuration error or some other problem.
You can re-run a snapshot for a table for which you previously captured a snapshot by initiating a so-called ad-hoc snapshot. Ad hoc snapshots require the use of signaling tables. You initiate an ad hoc snapshot by sending a signal request to the Debezium signaling table.
When you initiate an ad hoc snapshot of an existing table, the connector appends content to the topic that already exists for the table. If a previously existing topic was removed, Debezium can create a topic automatically if automatic topic creation is enabled.
Ad hoc snapshot signals specify the tables to include in the snapshot. The snapshot can capture the entire contents of the database, or capture only a subset of the tables in the database. Also, the snapshot can capture a subset of the contents of the table(s) in the database.
You specify the tables to capture by sending an execute-snapshot
message to the signaling table.
Set the type of the execute-snapshot
signal to incremental
or blocking
, and provide the names of the tables to include in the snapshot, as described in the following table:
Field | Default | Value |
---|---|---|
|
|
Specifies the type of snapshot that you want to run. |
|
N/A |
An array that contains regular expressions matching the fully-qualified names of the tables to include in the snapshot. |
|
N/A |
An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot.
|
|
N/A |
An optional string that specifies the column name that the connector uses as the primary key of a table during the snapshot process. |
You initiate an ad hoc incremental snapshot by adding an entry with the execute-snapshot
signal type to the signaling table, or by sending a signal message to a Kafka signaling topic.
After the connector processes the message, it begins the snapshot operation.
The snapshot process reads the first and last primary key values and uses those values as the start and end point for each table.
Based on the number of entries in the table, and the configured chunk size, Debezium divides the table into chunks, and proceeds to snapshot each chunk, in succession, one at a time.
For more information, see Incremental snapshots.
You initiate an ad hoc blocking snapshot by adding an entry with the execute-snapshot
signal type to the signaling table or signaling topic.
After the connector processes the message, it begins the snapshot operation.
The connector temporarily stops streaming, and then initiates a snapshot of the specified table, following the same process that it uses during an initial snapshot.
After the snapshot completes, the connector resumes streaming.
For more information, see Blocking snapshots.
Incremental snapshots
To provide flexibility in managing snapshots, Debezium includes a supplementary snapshot mechanism, known as incremental snapshotting. Incremental snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. Incremental snapshots are based on the DDD-3 design document.
In an incremental snapshot, instead of capturing the full state of a database all at once, as in an initial snapshot, Debezium captures each table in phases, in a series of configurable chunks. You can specify the tables that you want the snapshot to capture and the size of each chunk. The chunk size determines the number of rows that the snapshot collects during each fetch operation on the database. The default chunk size for incremental snapshots is 1024 rows.
As an incremental snapshot proceeds, Debezium uses watermarks to track its progress, maintaining a record of each table row that it captures. This phased approach to capturing data provides the following advantages over the standard initial snapshot process:
-
You can run incremental snapshots in parallel with streamed data capture, instead of postponing streaming until the snapshot completes. The connector continues to capture near real-time events from the change log throughout the snapshot process, and neither operation blocks the other.
-
If the progress of an incremental snapshot is interrupted, you can resume it without losing any data. After the process resumes, the snapshot begins at the point where it stopped, rather than recapturing the table from the beginning.
-
You can run an incremental snapshot on demand at any time, and repeat the process as needed to adapt to database updates. For example, you might re-run a snapshot after you modify the connector configuration to add a table to its
table.include.list
property.
When you run an incremental snapshot, Debezium sorts each table by primary key and then splits the table into chunks based on the configured chunk size.
Working chunk by chunk, it then captures each table row in a chunk.
For each row that it captures, the snapshot emits a READ
event.
That event represents the value of the row when the snapshot for the chunk began.
As a snapshot proceeds, it’s likely that other processes continue to access the database, potentially modifying table records.
To reflect such changes, INSERT
, UPDATE
, or DELETE
operations are committed to the transaction log as per usual.
Similarly, the ongoing Debezium streaming process continues to detect these change events and emits corresponding change event records to Kafka.
In some cases, the UPDATE
or DELETE
events that the streaming process emits are received out of sequence.
That is, the streaming process might emit an event that modifies a table row before the snapshot captures the chunk that contains the READ
event for that row.
When the snapshot eventually emits the corresponding READ
event for the row, its value is already superseded.
To ensure that incremental snapshot events that arrive out of sequence are processed in the correct logical order, Debezium employs a buffering scheme for resolving collisions.
Only after collisions between the snapshot events and the streamed events are resolved does Debezium emit an event record to Kafka.
To assist in resolving collisions between late-arriving READ
events and streamed events that modify the same table row, Debezium employs a so-called snapshot window.
The snapshot window demarcates the interval during which an incremental snapshot captures data for a specified table chunk.
Before the snapshot window for a chunk opens, Debezium follows its usual behavior and emits events from the transaction log directly downstream to the target Kafka topic.
But from the moment that the snapshot for a particular chunk opens, until it closes, Debezium performs a de-duplication step to resolve collisions between events that have the same primary key..
For each data collection, the Debezium emits two types of events, and stores the records for them both in a single destination Kafka topic.
The snapshot records that it captures directly from a table are emitted as READ
operations.
Meanwhile, as users continue to update records in the data collection, and the transaction log is updated to reflect each commit, Debezium emits UPDATE
or DELETE
operations for each change.
As the snapshot window opens, and Debezium begins processing a snapshot chunk, it delivers snapshot records to a memory buffer.
During the snapshot windows, the primary keys of the READ
events in the buffer are compared to the primary keys of the incoming streamed events.
If no match is found, the streamed event record is sent directly to Kafka.
If Debezium detects a match, it discards the buffered READ
event, and writes the streamed record to the destination topic, because the streamed event logically supersede the static snapshot event.
After the snapshot window for the chunk closes, the buffer contains only READ
events for which no related transaction log events exist.
Debezium emits these remaining READ
events to the table’s Kafka topic.
The connector repeats the process for each snapshot chunk.
Currently, you can use either of the following methods to initiate an incremental snapshot:
The Debezium connector for PostgreSQL does not support schema changes while an incremental snapshot is running.
If a schema change is performed before the incremental snapshot start but after sending the signal then passthrough config option |
Triggering an incremental snapshot
To initiate an incremental snapshot, you can send an ad hoc snapshot signal to the signaling table on the source database.
You submit snapshot signals as SQL INSERT
queries.
After Debezium detects the change in the signaling table, it reads the signal, and runs the requested snapshot operation.
The query that you submit specifies the tables to include in the snapshot, and, optionally, specifies the type of snapshot operation.
Debezium currently supports the incremental
and blocking
snapshot types.
To specify the tables to include in the snapshot, provide a data-collections
array that lists the tables, or an array of regular expressions used to match tables, for example,
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
The data-collections
array for an incremental snapshot signal has no default value.
If the data-collections
array is empty, Debezium interprets the empty array to mean that no action is required, and it does not perform a snapshot.
If the name of a table that you want to include in a snapshot contains a dot ( |
-
-
A signaling data collection exists on the source database.
-
The signaling data collection is specified in the
signal.data.collection
property.
-
-
Send a SQL query to add the ad hoc incremental snapshot request to the signaling table:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
For example,
INSERT INTO myschema.debezium_signal (id, type, data) (1) values ('ad-hoc-1', (2) 'execute-snapshot', (3) '{"data-collections": ["schema1.table1", "schema1.table2"], (4) "type":"incremental", (5) "additional-conditions":[{"data-collection": "schema1.table1" ,"filter":"color=\'blue\'"}]}'); (6)
The values of the
id
,type
, anddata
parameters in the command correspond to the fields of the signaling table.
The following table describes the parameters in the example:Table 3. Descriptions of fields in a SQL command for sending an incremental snapshot signal to the signaling table Item Value Description 1
schema.debezium_signal
Specifies the fully-qualified name of the signaling table on the source database.
2
ad-hoc-1
The
id
parameter specifies an arbitrary string that is assigned as theid
identifier for the signal request.
Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string. Rather, during the snapshot, Debezium generates its ownid
string as a watermarking signal.3
execute-snapshot
The
type
parameter specifies the operation that the signal is intended to trigger.4
data-collections
A required component of the
data
field of a signal that specifies an array of table names or regular expressions to match table names to include in the snapshot.
The array lists regular expressions that use the formatschema.table
to match the fully-qualified names of the tables. This format is the same as the one that you use to specify the name of the connector’s signaling table.5
incremental
An optional
type
component of thedata
field of a signal that specifies the type of snapshot operation to run.
Valid values areincremental
andblocking
.
If you do not specify a value, the connector defaults to performing an incremental snapshot.6
additional-conditions
An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot.
Each additional condition is an object withdata-collection
andfilter
properties. You can specify different filters for each data collection.
* Thedata-collection
property is the fully-qualified name of the data collection that the filter applies to. For more information about theadditional-conditions
parameter, see Running an ad hoc incremental snapshots withadditional-conditions
.
Running an ad hoc incremental snapshots with additional-conditions
If you want a snapshot to include only a subset of the content in a table, you can modify the signal request by appending an additional-conditions
parameter to the snapshot signal.
The SQL query for a typical snapshot takes the following form:
SELECT * FROM <tableName> ....
By adding an additional-conditions
parameter, you append a WHERE
condition to the SQL query, as in the following example:
SELECT * FROM <data-collection> WHERE <filter> ....
The following example shows a SQL query to send an ad hoc incremental snapshot request with an additional condition to the signaling table:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
For example, suppose you have a products
table that contains the following columns:
-
id
(primary key) -
color
-
quantity
If you want an incremental snapshot of the products
table to include only the data items where color=blue
, you can use the following SQL statement to trigger the snapshot:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue"}]}');
The additional-conditions
parameter also enables you to pass conditions that are based on more than one column.
For example, using the products
table from the previous example, you can submit a query that triggers an incremental snapshot that includes the data of only those items for which color=blue
and quantity>10
:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue AND quantity>10"}]}');
The following example, shows the JSON for an incremental snapshot event that is captured by a connector.
{
"before":null,
"after": {
"pk":"1",
"value":"New data"
},
"source": {
...
"snapshot":"incremental" (1)
},
"op":"r", (2)
"ts_ms":"1620393591654",
"ts_us":"1620393591654547",
"ts_ns":"1620393591654547920",
"transaction":null
}
Item | Field name | Description |
---|---|---|
1 |
|
Specifies the type of snapshot operation to run. |
2 |
|
Specifies the event type. |
Using the Kafka signaling channel to trigger an incremental snapshot
You can send a message to the configured Kafka topic to request the connector to run an ad hoc incremental snapshot.
The key of the Kafka message must match the value of the topic.prefix
connector configuration option.
The value of the message is a JSON object with type
and data
fields.
The signal type is execute-snapshot
, and the data
field must have the following fields:
Field | Default | Value |
---|---|---|
|
|
The type of the snapshot to be executed.
Currently Debezium supports the |
|
N/A |
An array of comma-separated regular expressions that match the fully-qualified names of tables to include in the snapshot. |
|
N/A |
An optional array of additional conditions that specifies criteria that the connector evaluates to designate a subset of records to include in a snapshot. |
execute-snapshot
Kafka messageKey = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Debezium uses the additional-conditions
field to select a subset of a table’s content.
Typically, when Debezium runs a snapshot, it runs a SQL query such as:
SELECT * FROM <tableName> ….
When the snapshot request includes an additional-conditions
property, the data-collection
and filter
parameters of the property are appended to the SQL query, for example:
SELECT * FROM <data-collection> WHERE <filter> ….
For example, given a products
table with the columns id
(primary key), color
, and brand
, if you want a snapshot to include only content for which color='blue'
, when you request the snapshot, you could add the additional-conditions
property to filter the content:
:leveloffset: +1
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue'"}]}}`
You can also use the additional-conditions
property to pass conditions based on multiple columns.
For example, using the same products
table as in the previous example, if you want a snapshot to include only the content from the products
table for which color='blue'
, and brand='MyBrand'
, you could send the following request:
:leveloffset: +1
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
Stopping an incremental snapshot
In some situations, it might be necessary to stop an incremental snapshot. For example, you might realize that snapshot was not configured correctly, or maybe you want to ensure that resources are available for other database operations. You can stop a snapshot that is already running by sending a signal to the signaling table on the source database.
You submit a stop snapshot signal to the signaling table by sending it in a SQL INSERT
query.
The stop-snapshot signal specifies the type
of the snapshot operation as incremental
, and optionally specifies the tables that you want to omit from the currently running snapshot.
After Debezium detects the change in the signaling table, it reads the signal, and stops the incremental snapshot operation if it’s in progress.
You can also stop an incremental snapshot by sending a JSON message to the Kafka signaling topic.
-
-
A signaling data collection exists on the source database.
-
The signaling data collection is specified in the
signal.data.collection
property.
-
-
Send a SQL query to stop the ad hoc incremental snapshot to the signaling table:
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');
For example,
INSERT INTO myschema.debezium_signal (id, type, data) (1) values ('ad-hoc-1', (2) 'stop-snapshot', (3) '{"data-collections": ["schema1.table1", "schema1.table2"], (4) "type":"incremental"}'); (5)
The values of the
id
,type
, anddata
parameters in the signal command correspond to the fields of the signaling table.
The following table describes the parameters in the example:Table 6. Descriptions of fields in a SQL command for sending a stop incremental snapshot signal to the signaling table Item Value Description 1
schema.debezium_signal
Specifies the fully-qualified name of the signaling table on the source database.
2
ad-hoc-1
The
id
parameter specifies an arbitrary string that is assigned as theid
identifier for the signal request.
Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string.3
stop-snapshot
Specifies
type
parameter specifies the operation that the signal is intended to trigger.4
data-collections
An optional component of the
data
field of a signal that specifies an array of table names or regular expressions to match table names to remove from the snapshot.
The array lists regular expressions which match tables by their fully-qualified names in the formatschema.table
If you omit this component from the
data
field, the signal stops the entire incremental snapshot that is in progress.5
incremental
A required component of the
data
field of a signal that specifies the type of snapshot operation that is to be stopped.
Currently, the only valid option isincremental
.
If you do not specify atype
value, the signal fails to stop the incremental snapshot.
Using the Kafka signaling channel to stop an incremental snapshot
You can send a signal message to the configured Kafka signaling topic to stop an ad hoc incremental snapshot.
The key of the Kafka message must match the value of the topic.prefix
connector configuration option.
The value of the message is a JSON object with type
and data
fields.
The signal type is stop-snapshot
, and the data
field must have the following fields:
Field | Default | Value |
---|---|---|
|
|
The type of the snapshot to be executed.
Currently Debezium supports only the |
|
N/A |
An optional array of comma-separated regular expressions that match the fully-qualified names of the tables an array of table names or regular expressions to match table names to remove from the snapshot. |
The following example shows a typical stop-snapshot
Kafka message:
:leveloffset: +1
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
Read-only incremental snapshots
You can configure a PostgreSQL connector that has a read-only connection to the database to run incremental snapshots without requiring a signal data collection table. To run an incremental snapshot with read-only access, the connector uses the current in-progress transaction as high and low watermarks. The state of a chunk’s window is updated by comparing the transaction ID of write-ahead-log events or heartbeat events against low and high watermarks.
To switch to a read-only implementation, set the value of the read.only
property to true
.
-
PostgreSQL version 13 or later.
Ad hoc read-only incremental snapshots
When the PostgreSQL connection is read-only, you can use any of the available signaling channels without the requirement to use the source
channel.
Custom snapshotter SPI
For more advanced uses, you can fine-tune control of the snapshot by implementing one of the following interfaces:
io.debezium.snapshot.spi.Snapshotter
-
Controls whether the connector takes a snapshot.
io.debezium.snapshot.spi.SnapshotQuery
-
Controls how data is queried during a snapshot.
io.debezium.snapshot.spi.SnapshotLock
-
Controls whether the connector locks tables when taking a snapshot.
/**
* {@link Snapshotter} is used to determine the following details about the snapshot process:
* <p>
* - Whether a snapshot occurs. <br>
* - Whether streaming continues during the snapshot. <br>
* - Whether the snapshot includes schema (if supported). <br>
* - Whether to snapshot data or schema following an error.
* <p>
* Although Debezium provides many default snapshot modes,
* to provide more advanced functionality, such as partial snapshots,
* you can customize implementation of the interface.
* For more information, see the documentation.
*
*
*
*/
@Incubating
public interface Snapshotter extends Configurable {
/**
* @return the name of the snapshotter.
*
*
*/
String name();
/**
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
*
* @return {@code true} if the snapshotter should take a data snapshot
*/
boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress);
/**
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
*
* @return {@code true} if the snapshotter should take a schema snapshot
*/
boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress);
/**
* @return {@code true} if the snapshotter should stream after taking a snapshot
*/
boolean shouldStream();
/**
* @return {@code true} whether the schema can be recovered if database schema history is corrupted.
*/
boolean shouldSnapshotOnSchemaError();
/**
* @return {@code true} whether the snapshot should be re-executed when there is a gap in data stream.
*/
boolean shouldSnapshotOnDataError();
/**
*
* @return {@code true} if streaming should resume from the start of the snapshot
* transaction, or {@code false} for when a connector resumes and takes a snapshot,
* streaming should resume from where streaming previously left off.
*/
default boolean shouldStreamEventsStartingFromSnapshot() {
return true;
}
/**
* Lifecycle hook called after the snapshot phase is successful.
*/
default void snapshotCompleted() {
// no operation
}
/**
* Lifecycle hook called after the snapshot phase is aborted.
*/
default void snapshotAborted() {
// no operation
}
}
/**
* {@link SnapshotQuery} is used to determine the query used during a data snapshot
*
*
*/
public interface SnapshotQuery extends Configurable, Service {
/**
* @return the name of the snapshot lock.
*
*
*/
String name();
/**
* Generate a valid query string for the specified table, or an empty {@link Optional}
* to skip snapshotting this table (but that table will still be streamed from)
*
* @param tableId the table to generate a query for
* @param snapshotSelectColumns the columns to be used in the snapshot select based on the column
* include/exclude filters
* @return a valid query string, or none to skip snapshotting this table
*/
Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns);
}
/**
* {@link SnapshotLock} is used to determine the table lock mode used during schema snapshot
*
*
*/
public interface SnapshotLock extends Configurable, Service {
/**
* @return the name of the snapshot lock.
*
*
*/
String name();
/**
* Returns a SQL statement for locking the given table during snapshotting, if required by the specific snapshotter
* implementation.
*/
Optional<String> tableLockingStatement(Duration lockTimeout, String tableId);
}
Blocking snapshots
To provide more flexibility in managing snapshots, Debezium includes a supplementary ad hoc snapshot mechanism, known as a blocking snapshot. Blocking snapshots rely on the Debezium mechanism for sending signals to a Debezium connector.
A blocking snapshot behaves just like an initial snapshot, except that you can trigger it at run time.
You might want to run a blocking snapshot rather than use the standard initial snapshot process in the following situations:
-
You add a new table and you want to complete the snapshot while the connector is running.
-
You add a large table, and you want the snapshot to complete in less time than is possible with an incremental snapshot.
When you run a blocking snapshot, Debezium stops streaming, and then initiates a snapshot of the specified table, following the same process that it uses during an initial snapshot. After the snapshot completes, the streaming is resumed.
You can set the following properties in the data
component of a signal:
-
data-collections: to specify which tables must be snapshot.
-
data-collections: Specifies the tables that you want the snapshot to include.
This property accepts a comma-separated list of regular expressions that match fully-qualified table names. The behavior of the property is similar to the behavior of thetable.include.list
property, which specifies the tables to capture in a blocking snapshot. -
additional-conditions: You can specify different filters for different table.
-
The
data-collection
property is the fully-qualified name of the table for which the filter will be applied, and can be case-sensitive or case-insensitive depending on the database. -
The
filter
property will have the same value used in thesnapshot.select.statement.overrides
, the fully-qualified name of the table that should match by case.
-
For example:
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
A delay might exist between the time that you send the signal to trigger the snapshot, and the time when streaming stops and the snapshot starts. As a result of this delay, after the snapshot completes, the connector might emit some event records that duplicate records captured by the snapshot.
Streaming changes
The PostgreSQL connector typically spends the vast majority of its time streaming changes from the PostgreSQL server to which it is connected. This mechanism relies on PostgreSQL’s replication protocol. This protocol enables clients to receive changes from the server as they are committed in the server’s transaction log at certain positions, which are referred to as Log Sequence Numbers (LSNs).
Whenever the server commits a transaction, a separate server process invokes a callback function from the logical decoding plug-in. This function processes the changes from the transaction, converts them to a specific format (Protobuf or JSON in the case of Debezium plug-in) and writes them on an output stream, which can then be consumed by clients.
The Debezium PostgreSQL connector acts as a PostgreSQL client. When the connector receives changes it transforms the events into Debezium create, update, or delete events that include the LSN of the event. The PostgreSQL connector forwards these change events in records to the Kafka Connect framework, which is running in the same process. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.
Periodically, Kafka Connect records the most recent offset in another Kafka topic. The offset indicates source-specific position information that Debezium includes with each event. For the PostgreSQL connector, the LSN recorded in each change event is the offset.
When Kafka Connect gracefully shuts down, it stops the connectors, flushes all event records to Kafka, and records the last offset received from each connector. When Kafka Connect restarts, it reads the last recorded offset for each connector, and starts each connector at its last recorded offset. When the connector restarts, it sends a request to the PostgreSQL server to send the events starting just after that position.
The PostgreSQL connector retrieves schema information as part of the events sent by the logical decoding plug-in. However, the connector does not retrieve information about which columns compose the primary key. The connector obtains this information from the JDBC metadata (side channel). If the primary key definition of a table changes (by adding, removing or renaming primary key columns), there is a tiny period of time when the primary key information from JDBC is not synchronized with the change event that the logical decoding plug-in generates. During this tiny period, a message could be created with an inconsistent key structure. To prevent this inconsistency, update primary key structures as follows:
|
PostgreSQL 10+ logical decoding support (pgoutput
)
As of PostgreSQL 10+, there is a logical replication stream mode, called pgoutput
that is natively supported by PostgreSQL. This means that a Debezium PostgreSQL connector can consume that replication stream
without the need for additional plug-ins.
This is particularly valuable for environments where installation of plug-ins is not supported or not allowed.
For more information, see Setting up PostgreSQL.
Topic names
By default, the PostgreSQL connector writes change events for all INSERT
, UPDATE
, and DELETE
operations that occur in a table to a single Apache Kafka topic that is specific to that table.
The connector uses the following convention to name change event topics:
topicPrefix.schemaName.tableName
The following list provides definitions for the components of the default name:
- topicPrefix
-
The topic prefix as specified by the
topic.prefix
configuration property. - schemaName
-
The name of the database schema in which the change event occurred.
- tableName
-
The name of the database table in which the change event occurred.
For example, suppose that fulfillment
is the logical server name in the configuration for a connector that is capturing changes in a PostgreSQL installation that has a postgres
database and an inventory
schema that contains four tables: products
, products_on_hand
, customers
, and orders
. The connector would stream records to these four Kafka topics:
-
fulfillment.inventory.products
-
fulfillment.inventory.products_on_hand
-
fulfillment.inventory.customers
-
fulfillment.inventory.orders
Now suppose that the tables are not part of a specific schema but were created in the default public
PostgreSQL schema. The names of the Kafka topics would be:
-
fulfillment.public.products
-
fulfillment.public.products_on_hand
-
fulfillment.public.customers
-
fulfillment.public.orders
The connector applies similar naming conventions to label its transaction metadata topics.
If the default topic name do not meet your requirements, you can configure custom topic names. To configure custom topic names, you specify regular expressions in the logical topic routing SMT. For more information about using the logical topic routing SMT to customize topic naming, see Topic routing.
Transaction metadata
Debezium can generate events that represent transaction boundaries and that enrich data change event messages.
Limits on when Debezium receives transaction metadata
Debezium registers and receives metadata only for transactions that occur after you deploy the connector. Metadata for transactions that occur before you deploy the connector is not available. |
For every transaction BEGIN
and END
, Debezium generates an event that contains the following fields:
status
-
BEGIN
orEND
. id
-
String representation of the unique transaction identifier composed of Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is
txID:LSN
. ts_ms
-
The time of a transaction boundary event (
BEGIN
orEND
event) at the data source. If the data source does not provide Debezium with the event time, then the field instead represents the time at which Debezium processes the event. event_count
(forEND
events)-
Total number of events emmitted by the transaction.
data_collections
(forEND
events)-
An array of pairs of
data_collection
andevent_count
elements that indicates the number of events that the connector emits for changes that originate from a data collection.
{
"status": "BEGIN",
"id": "571:53195829",
"ts_ms": 1486500577125,
"event_count": null,
"data_collections": null
}
{
"status": "END",
"id": "571:53195832",
"ts_ms": 1486500577691,
"event_count": 2,
"data_collections": [
{
"data_collection": "s1.a",
"event_count": 1
},
{
"data_collection": "s2.a",
"event_count": 1
}
]
}
Unless overridden via the topic.transaction
option,
transaction events are written to the topic named <topic.prefix>
.transaction
.
When transaction metadata is enabled the data message Envelope
is enriched with a new transaction
field.
This field provides information about every event in the form of a composite of fields:
id
-
String representation of unique transaction identifier.
total_order
-
The absolute position of the event among all events generated by the transaction.
data_collection_order
-
The per-data collection position of the event among all events that were emitted by the transaction.
Following is an example of a message:
{
"before": null,
"after": {
"pk": "2",
"aa": "1"
},
"source": {
...
},
"op": "c",
"ts_ms": "1580390884335",
"ts_us": "1580390884335451",
"ts_ns": "1580390884335451325",
"transaction": {
"id": "571:53195832",
"total_order": "1",
"data_collection_order": "1"
}
}
Data change events
The Debezium PostgreSQL connector generates a data change event for each row-level INSERT
, UPDATE
, and DELETE
operation. Each event contains a key and a value. The structure of the key and the value depends on the table that was changed.
Debezium and Kafka Connect are designed around continuous streams of event messages. However, the structure of these events may change over time, which can be difficult for consumers to handle. To address this, each event contains the schema for its content or, if you are using a schema registry, a schema ID that a consumer can use to obtain the schema from the registry. This makes each event self-contained.
The following skeleton JSON shows the basic four parts of a change event. However, how you configure the Kafka Connect converter that you choose to use in your application determines the representation of these four parts in change events. A schema
field is in a change event only when you configure the converter to produce it. Likewise, the event key and event payload are in a change event only if you configure a converter to produce it. If you use the JSON converter and you configure it to produce all four basic change event parts, change events have this structure:
{
"schema": { (1)
...
},
"payload": { (2)
...
},
"schema": { (3)
...
},
"payload": { (4)
...
},
}
Item | Field name | Description |
---|---|---|
1 |
|
The first |
2 |
|
The first |
3 |
|
The second |
4 |
|
The second |
By default behavior is that the connector streams change event records to topics with names that are the same as the event’s originating table.
Starting with Kafka 0.10, Kafka can optionally record the event key and value with the timestamp at which the message was created (recorded by the producer) or written to the log by Kafka. |
The PostgreSQL connector ensures that all Kafka Connect schema names adhere to the Avro schema name format. This means that the logical server name must start with a Latin letter or an underscore, that is, a-z, A-Z, or _. Each remaining character in the logical server name and each character in the schema and table names must be a Latin letter, a digit, or an underscore, that is, a-z, A-Z, 0-9, or \_. If there is an invalid character it is replaced with an underscore character. This can lead to unexpected conflicts if the logical server name, a schema name, or a table name contains invalid characters, and the only characters that distinguish names from one another are invalid and thus replaced with underscores. |
Change event keys
For a given table, the change event’s key has a structure that contains a field for each column in the primary key of the table at the time the event was created. Alternatively, if the table has REPLICA IDENTITY
set to FULL
or USING INDEX
there is a field for each unique key constraint.
Consider a customers
table defined in the public
database schema and the example of a change event key for that table.
CREATE TABLE customers (
id SERIAL,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
PRIMARY KEY(id)
);
If the topic.prefix
connector configuration property has the value PostgreSQL_server
, every change event for the customers
table while it has this definition has the same key structure, which in JSON looks like this:
{
"schema": { (1)
"type": "struct",
"name": "PostgreSQL_server.public.customers.Key", (2)
"optional": false, (3)
"fields": [ (4)
{
"name": "id",
"index": "0",
"schema": {
"type": "INT32",
"optional": "false"
}
}
]
},
"payload": { (5)
"id": "1"
},
}
Item | Field name | Description |
---|---|---|
1 |
|
The schema portion of the key specifies a Kafka Connect schema that describes what is in the key’s |
2 |
|
Name of the schema that defines the structure of the key’s payload. This schema describes the structure of the primary key for the table that was changed. Key schema names have the format connector-name.database-name.table-name.
|
3 |
|
Indicates whether the event key must contain a value in its |
4 |
|
Specifies each field that is expected in the |
5 |
|
Contains the key for the row for which this change event was generated. In this example, the key, contains a single |
Although the |
If the table does not have a primary or unique key, then the change event’s key is null. The rows in a table without a primary or unique key constraint cannot be uniquely identified. |
Change event values
The value in a change event is a bit more complicated than the key. Like the key, the value has a schema
section and a payload
section. The schema
section contains the schema that describes the Envelope
structure of the payload
section, including its nested fields. Change events for operations that create, update or delete data all have a value payload with an envelope structure.
Consider the same sample table that was used to show an example of a change event key:
CREATE TABLE customers (
id SERIAL,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
PRIMARY KEY(id)
);
The value portion of a change event for a change to this table varies according to the REPLICA IDENTITY
setting and the operation that the event is for.
Replica identity
REPLICA IDENTITY is a PostgreSQL-specific table-level setting that determines the amount of information that is available to the logical decoding plug-in for UPDATE
and DELETE
events. More specifically, the setting of REPLICA IDENTITY
controls what (if any) information is available for the previous values of the table columns involved, whenever an UPDATE
or DELETE
event occurs.
There are 4 possible values for REPLICA IDENTITY
:
-
DEFAULT
- The default behavior is thatUPDATE
andDELETE
events contain the previous values for the primary key columns of a table if that table has a primary key. For anUPDATE
event, only the primary key columns with changed values are present.If a table does not have a primary key, the connector does not emit
UPDATE
orDELETE
events for that table. For a table without a primary key, the connector emits only create events. Typically, a table without a primary key is used for appending messages to the end of the table, which means thatUPDATE
andDELETE
events are not useful. -
NOTHING
- Emitted events forUPDATE
andDELETE
operations do not contain any information about the previous value of any table column. -
FULL
- Emitted events forUPDATE
andDELETE
operations contain the previous values of all columns in the table. -
INDEX
index-name - Emitted events forUPDATE
andDELETE
operations contain the previous values of the columns contained in the specified index.UPDATE
events also contain the indexed columns with the updated values.
create events
The following example shows the value portion of a change event that the connector generates for an operation that creates data in the customers
table:
{
"schema": { (1)
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "PostgreSQL_server.inventory.customers.Value", (2)
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "PostgreSQL_server.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "int64",
"optional": false,
"field": "ts_us"
},
{
"type": "int64",
"optional": false,
"field": "ts_ns"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source", (3)
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
}
],
"optional": false,
"name": "PostgreSQL_server.inventory.customers.Envelope" (4)
},
"payload": { (5)
"before": null, (6)
"after": { (7)
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { (8)
"version": "3.0.5.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"ts_us": 1559033904863123,
"ts_ns": 1559033904863123000,
"snapshot": true,
"db": "postgres",
"sequence": "[\"24023119\",\"24023128\"]",
"schema": "public",
"table": "customers",
"txId": 555,
"lsn": 24023128,
"xmin": null
},
"op": "c", (9)
"ts_ms": 1559033904863, (10)
"ts_us": 1559033904863841, (10)
"ts_ns": 1559033904863841257 (10)
}
}
Item | Field name | Description | ||
---|---|---|---|---|
1 |
|
The value’s schema, which describes the structure of the value’s payload. A change event’s value schema is the same in every change event that the connector generates for a particular table. |
||
2 |
|
In the |
||
3 |
|
|
||
4 |
|
|
||
5 |
|
The value’s actual data. This is the information that the change event is providing. |
||
6 |
|
An optional field that specifies the state of the row before the event occurred. When the
|
||
7 |
|
An optional field that specifies the state of the row after the event occurred. In this example, the |
||
8 |
|
Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events, the order in which the events occurred, and whether events were part of the same transaction. The source metadata includes:
|
||
9 |
|
Mandatory string that describes the type of operation that caused the connector to generate the event. In this example,
|
||
10 |
|
Optional field that displays the time at which the connector processed the event.
The time is based on the system clock in the JVM running the Kafka Connect task. |
update events
The value of a change event for an update in the sample customers
table has the same schema as a create event for that table. Likewise, the event value’s payload has the same structure. However, the event value payload contains different values in an update event. Here is an example of a change event value in an event that the connector generates for an update in the customers
table:
{
"schema": { ... },
"payload": {
"before": { (1)
"id": 1
},
"after": { (2)
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { (3)
"version": "3.0.5.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"ts_us": 1559033904863769,
"ts_ns": 1559033904863769000,
"snapshot": false,
"db": "postgres",
"schema": "public",
"table": "customers",
"txId": 556,
"lsn": 24023128,
"xmin": null
},
"op": "u", (4)
"ts_ms": 1465584025523, (5)
"ts_us": 1465584025523514, (5)
"ts_ns": 1465584025523514964, (5)
}
}
Item | Field name | Description |
---|---|---|
1 |
|
An optional field that contains values that were in the row before the database commit. In this example, only the primary key column, For an update event to contain the previous values of all columns in the row, you would have to change the |
2 |
|
An optional field that specifies the state of the row after the event occurred. In this example, the |
3 |
|
Mandatory field that describes the source metadata for the event. The
|
4 |
|
Mandatory string that describes the type of operation. In an update event value, the |
5 |
|
Optional field that displays the time at which the connector processed the event.
The time is based on the system clock in the JVM running the Kafka Connect task. |
Updating the columns for a row’s primary/unique key changes the value of the row’s key. When a key changes, Debezium outputs three events: a |
Primary key updates
An UPDATE
operation that changes a row’s primary key field(s) is known
as a primary key change. For a primary key change, in place of sending an UPDATE
event record, the connector sends a DELETE
event record for the old key and a CREATE
event record for the new (updated) key. These events have the usual structure and content, and in addition, each one has a message header related to the primary key change:
-
The
DELETE
event record has__debezium.newkey
as a message header. The value of this header is the new primary key for the updated row. -
The
CREATE
event record has__debezium.oldkey
as a message header. The value of this header is the previous (old) primary key that the updated row had.
delete events
The value in a delete change event has the same schema
portion as create and update events for the same table. The payload
portion in a delete event for the sample customers
table looks like this:
{
"schema": { ... },
"payload": {
"before": { (1)
"id": 1
},
"after": null, (2)
"source": { (3)
"version": "3.0.5.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"ts_us": 1559033904863852,
"ts_ns": 1559033904863852000,
"snapshot": false,
"db": "postgres",
"schema": "public",
"table": "customers",
"txId": 556,
"lsn": 46523128,
"xmin": null
},
"op": "d", (4)
"ts_ms": 1465581902461, (5)
"ts_us": 1465581902461496, (5)
"ts_ns": 1465581902461496187, (5)
}
}
Item | Field name | Description |
---|---|---|
1 |
|
Optional field that specifies the state of the row before the event occurred. In a delete event value, the |
2 |
|
Optional field that specifies the state of the row after the event occurred. In a delete event value, the |
3 |
|
Mandatory field that describes the source metadata for the event. In a delete event value, the
|
4 |
|
Mandatory string that describes the type of operation. The |
5 |
|
Optional field that displays the time at which the connector processed the event.
The time is based on the system clock in the JVM running the Kafka Connect task. |
A delete change event record provides a consumer with the information it needs to process the removal of this row.
For a consumer to be able to process a delete event generated for a table that does not have a primary key, set the table’s |
PostgreSQL connector events are designed to work with Kafka log compaction. Log compaction enables removal of some older messages as long as at least the most recent message for every key is kept. This lets Kafka reclaim storage space while ensuring that the topic contains a complete data set and can be used for reloading key-based state.
When a row is deleted, the delete event value still works with log compaction, because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that same key, the message value must be null
. To make this possible, the PostgreSQL connector follows a delete event with a special tombstone event that has the same key but a null
value.
truncate events
A truncate change event signals that a table has been truncated.
The message key is null
in this case, the message value looks like this:
{
"schema": { ... },
"payload": {
"source": { (1)
"version": "3.0.5.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"ts_us": 1559033904863112,
"ts_ns": 1559033904863112000,
"snapshot": false,
"db": "postgres",
"schema": "public",
"table": "customers",
"txId": 556,
"lsn": 46523128,
"xmin": null
},
"op": "t", (2)
"ts_ms": 1559033904961, (3)
"ts_us": 1559033904961654, (3)
"ts_ns": 1559033904961654789 (3)
}
}
Item | Field name | Description |
---|---|---|
1 |
|
Mandatory field that describes the source metadata for the event. In a truncate event value, the
|
2 |
|
Mandatory string that describes the type of operation. The |
3 |
|
Optional field that displays the time at which the connector processed the event.
The time is based on the system clock in the JVM running the Kafka Connect task. |
If a single TRUNCATE
operation affects multiple tables, the connector emits one truncate change event record for each truncated table.
A truncate event represents a change that is made to an entire table and it has no message key. As a result, for topics with multiple partitions, there is no ordering guarantee for the change events (create, update, and so forth), or truncate events that pertain to a table. For example, if a consumer reads events for a table from multiple partitions, it might receive an update event for a table from one partition after it receives a truncate event that deletes all of the data in the table from another partition. Ordering is guaranteed only for topics that use a single partition. |
If you do not want the connector to capture truncate events, use the skipped.operations
option to filter them out.
message events
This event type is only supported through the |
A message event signals that a generic logical decoding message has been inserted directly into the WAL typically with the pg_logical_emit_message
function.
The message key is a Struct
with a single field named prefix
in this case, carrying the prefix specified when inserting the message.
The message value looks like this for transactional messages:
{
"schema": { ... },
"payload": {
"source": { (1)
"version": "3.0.5.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"ts_us": 1559033904863879,
"ts_ns": 1559033904863879000,
"snapshot": false,
"db": "postgres",
"schema": "",
"table": "",
"txId": 556,
"lsn": 46523128,
"xmin": null
},
"op": "m", (2)
"ts_ms": 1559033904961, (3)
"ts_us": 1559033904961621, (3)
"ts_ns": 1559033904961621379, (3)
"message": { (4)
"prefix": "foo",
"content": "Ymfy"
}
}
}
Unlike other event types, non-transactional messages will not have any associated BEGIN
or END
transaction events.
The message value looks like this for non-transactional messages:
{
"schema": { ... },
"payload": {
"source": { (1)
"version": "3.0.5.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"ts_us": 1559033904863762,
"ts_ns": 1559033904863762000,
"snapshot": false,
"db": "postgres",
"schema": "",
"table": "",
"lsn": 46523128,
"xmin": null
},
"op": "m", (2)
"ts_ms": 1559033904961, (3)
"ts_us": 1559033904961741, (3)
"ts_ns": 1559033904961741698, (3)
"message": { (4)
"prefix": "foo",
"content": "Ymfy"
}
}
Item | Field name | Description |
---|---|---|
1 |
|
Mandatory field that describes the source metadata for the event. In a message event value, the
|
2 |
|
Mandatory string that describes the type of operation. The |
3 |
|
Optional field that displays the time at which the connector processed the event.
The time is based on the system clock in the JVM running the Kafka Connect task. For non-transactional message events, the |
4 |
|
Field that contains the message metadata
|
Data type mappings
The PostgreSQL connector represents changes to rows with events that are structured like the table in which the row exists. The event contains a field for each column value. How that value is represented in the event depends on the PostgreSQL data type of the column. The following sections describe how the connector maps PostgreSQL data types to a literal type and a semantic type in event fields.
-
literal type describes how the value is literally represented using Kafka Connect schema types:
INT8
,INT16
,INT32
,INT64
,FLOAT32
,FLOAT64
,BOOLEAN
,STRING
,BYTES
,ARRAY
,MAP
, andSTRUCT
. -
semantic type describes how the Kafka Connect schema captures the meaning of the field using the name of the Kafka Connect schema for the field.
If the default data type conversions do not meet your needs, you can create a custom converter for the connector.
Basic types
The following table describes how the connector maps basic types.
PostgreSQL data type | Literal type (schema type) | Semantic type (schema name) and Notes |
---|---|---|
|
|
n/a |
|
|
n/a |
|
|
|
|
|
|
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
n/a |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
|
Temporal types
Other than PostgreSQL’s TIMESTAMPTZ
and TIMETZ
data types, which contain time zone information, how temporal types are mapped depends on the value of the time.precision.mode
connector configuration property. The following sections describe these mappings:
time.precision.mode=adaptive
When the time.precision.mode
property is set to adaptive
, the default, the connector determines the literal type and semantic type based on the column’s data type definition. This ensures that events exactly represent the values in the database.
PostgreSQL data type | Literal type (schema type) | Semantic type (schema name) and Notes |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
time.precision.mode=adaptive_time_microseconds
When the time.precision.mode
configuration property is set to adaptive_time_microseconds
, the connector determines the literal type and semantic type for temporal types based on the column’s data type definition. This ensures that events exactly represent the values in the database, except all TIME
fields are captured as microseconds.
PostgreSQL data type | Literal type (schema type) | Semantic type (schema name) and Notes |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
time.precision.mode=connect
When the time.precision.mode
configuration property is set to connect
, the connector uses Kafka Connect logical types. This may be useful when consumers can handle only the built-in Kafka Connect logical types and are unable to handle variable-precision time values. However, since PostgreSQL supports microsecond precision, the events generated by a connector with the connect
time precision mode results in a loss of precision when the database column has a fractional second precision value that is greater than 3.
PostgreSQL data type | Literal type (schema type) | Semantic type (schema name) and Notes |
---|---|---|
|
|
|
|
|
|
|
|
|
TIMESTAMP type
The TIMESTAMP
type represents a timestamp without time zone information.
Such columns are converted into an equivalent Kafka Connect value based on UTC. For example, the TIMESTAMP
value "2018-06-20 15:13:16.945104" is represented by an io.debezium.time.MicroTimestamp
with the value "1529507596945104" when time.precision.mode
is not set to connect
.
The timezone of the JVM running Kafka Connect and Debezium does not affect this conversion.
PostgreSQL supports using +/-infinite
values in TIMESTAMP
columns.
These special values are converted to timestamps with value 9223372036825200000
in case of positive infinity or -9223372036832400000
in case of negative infinity.
This behavior mimics the standard behavior of the PostgreSQL JDBC driver.
For reference, see the org.postgresql.PGStatement
interface.
Decimal types
The setting of the PostgreSQL connector configuration property decimal.handling.mode
determines how the connector maps decimal types.
When the decimal.handling.mode
property is set to precise
, the connector uses the Kafka Connect org.apache.kafka.connect.data.Decimal
logical type for all DECIMAL
, NUMERIC
and MONEY
columns. This is the default mode.
PostgreSQL data type | Literal type (schema type) | Semantic type (schema name) and Notes |
---|---|---|
|
|
|
|
|
|
|
|
|
There is an exception to this rule.
When the NUMERIC
or DECIMAL
types are used without scale constraints, the values coming from the database have a different (variable) scale for each value. In this case, the connector uses io.debezium.data.VariableScaleDecimal
, which contains both the value and the scale of the transferred value.
PostgreSQL data type | Literal type (schema type) | Semantic type (schema name) and Notes |
---|---|---|
|
|
|
|
|
|
When the decimal.handling.mode
property is set to double
, the connector represents all DECIMAL
, NUMERIC
and MONEY
values as Java double values and encodes them as shown in the following table.
PostgreSQL data type | Literal type (schema type) | Semantic type (schema name) |
---|---|---|
|
|
|
|
|
|
|
|
The last possible setting for the decimal.handling.mode
configuration property is string
. In this case, the connector represents DECIMAL
, NUMERIC
and MONEY
values as their formatted string representation, and encodes them as shown in the following table.
PostgreSQL data type | Literal type (schema type) | Semantic type (schema name) |
---|---|---|
|
|
|
|
|
|
|
|
PostgreSQL supports NaN
(not a number) as a special value to be stored in DECIMAL
/NUMERIC
values when the setting of decimal.handling.mode
is string
or double
. In this case, the connector encodes NaN
as either Double.NaN
or the string constant NAN
.
HSTORE type
The setting of the PostgreSQL connector configuration property hstore.handling.mode
determines how the connector maps HSTORE
values.
When the hstore.handling.mode
property is set to json
(the default), the connector represents HSTORE
values as string representations of JSON values and encodes them as shown in the following table.
When the hstore.handling.mode
property is set to map
, the connector uses the MAP
schema type for HSTORE
values.
PostgreSQL data type | Literal type (schema type) | Semantic type (schema name) and Notes |
---|---|---|
|
|
|
|
|
n/a |
Domain types
PostgreSQL supports user-defined types that are based on other underlying types. When such column types are used, Debezium exposes the column’s representation based on the full type hierarchy.
Capturing changes in columns that use PostgreSQL domain types requires special consideration. When a column is defined to contain a domain type that extends one of the default database types and the domain type defines a custom length or scale, the generated schema inherits that defined length or scale. When a column is defined to contain a domain type that extends another domain type that defines a custom length or scale, the generated schema does not inherit the defined length or scale because that information is not available in the PostgreSQL driver’s column metadata. |
Network address types
PostgreSQL has data types that can store IPv4, IPv6, and MAC addresses. It is better to use these types instead of plain text types to store network addresses. Network address types offer input error checking and specialized operators and functions.
PostgreSQL data type | Literal type (schema type) | Semantic type (schema name) and Notes |
---|---|---|
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
PostGIS types
The PostgreSQL connector supports all PostGIS data types.
PostGIS data type | Literal type (schema type) | Semantic type (schema name) and Notes |
---|---|---|
|
|
For format details, see Open Geospatial Consortium Simple Features Access specification. |
|
|
For format details, see Open Geospatial Consortium Simple Features Access specification. |
pgvector types
The PostgreSQL connector supports all pgvector extension data types.
pgvector data type | Literal type (schema type) | Semantic type (schema name) and Notes |
---|---|---|
|
|
|
|
|
|
|
|
Contains a structure that includes the following fields:
|
Toasted values
PostgreSQL has a hard limit on the page size. This means that values that are larger than around 8 KBs need to be stored by using TOAST storage. This impacts replication messages that are coming from the database. Values that were stored by using the TOAST mechanism and that have not been changed are not included in the message, unless they are part of the table’s replica identity. There is no safe way for Debezium to read the missing value out-of-bands directly from the database, as this would potentially lead to race conditions. Consequently, Debezium follows these rules to handle toasted values:
-
Tables with
REPLICA IDENTITY FULL
- TOAST column values are part of thebefore
andafter
fields in change events just like any other column. -
Tables with
REPLICA IDENTITY DEFAULT
- When receiving anUPDATE
event from the database, any unchanged TOAST column value that is not part of the replica identity is not contained in the event. Similarly, when receiving aDELETE
event, no TOAST columns, if any, are in thebefore
field. As Debezium cannot safely provide the column value in this case, the connector returns a placeholder value as defined by the connector configuration property,unavailable.value.placeholder
.
Default values
If a default value is specified for a column in the database schema, the PostgreSQL connector will attempt to propagate this value to the Kafka schema whenever possible. Most common data types are supported, including:
-
BOOLEAN
-
Numeric types (
INT
,FLOAT
,NUMERIC
, etc.) -
Text types (
CHAR
,VARCHAR
,TEXT
, etc.) -
Temporal types (
DATE
,TIME
,INTERVAL
,TIMESTAMP
,TIMESTAMPTZ
) -
JSON
,JSONB
,XML
-
UUID
Note that for temporal types, parsing of the default value is provided by PostgreSQL libraries; therefore, any string representation which is normally supported by PostgreSQL should also be supported by the connector.
In the case that the default value is generated by a function rather than being directly specified in-line, the connector will instead export the equivalent of 0
for the given data type. These values include:
-
FALSE
forBOOLEAN
-
0
with appropriate precision, for numeric types -
Empty string for text/XML types
-
{}
for JSON types -
1970-01-01
forDATE
,TIMESTAMP
,TIMESTAMPTZ
types -
00:00
forTIME
-
EPOCH
forINTERVAL
-
00000000-0000-0000-0000-000000000000
forUUID
This support currently extends only to explicit usage of functions. For example, CURRENT_TIMESTAMP(6)
is supported with parentheses, but CURRENT_TIMESTAMP
is not.
Support for the propagation of default values exists primarily to allow for safe schema evolution when using the PostgreSQL connector with a schema registry which enforces compatibility between schema versions. Due to this primary concern, as well as the refresh behaviours of the different plug-ins, the default value present in the Kafka schema is not guaranteed to always be in-sync with the default value in the database schema.
This behaviour may be unexpected, but it is still safe. Only the schema definition is affected, while the real values present in the message will remain consistent with what was written to the source database. |
Custom converters
By default, Debezium does not replicate data from columns with custom data types, such as composite types that are created by using SQL CREATE TYPE
statements.
To replicate columns with custom data types, follow the instructions for creating a custom converter, with a few important caveats:
-
Set the
include.unknown.datatypes
property in the connector configuration totrue
. The defaultfalse
setting causes the custom converter to always returnnull
values. -
The type of value that is passed to the converter depends on the logical decoding output plug-in that is configured for the replication slot.
-
decoderbufs
passes a byte array (byte[]
) representation of the column data. -
pgoutput
passes a string representation of the column data.
-
Setting up Postgres
Before using the PostgreSQL connector to monitor the changes committed on a PostgreSQL server, decide which logical decoding plug-in you intend to use.
If you plan not to use the native pgoutput
logical replication stream support, then you must install the logical decoding plug-in into the PostgreSQL server. Afterward, enable a replication slot, and configure a user with sufficient privileges to perform the replication.
If your database is hosted by a service such as Heroku Postgres you might be unable to install the plug-in. If so, and if you are using PostgreSQL 10+, you can use the pgoutput
decoder support to capture changes in your database. If that is not an option, you are unable to use Debezium with your database.
PostgreSQL in the Cloud
PostgreSQL on Amazon RDS
It is possible to capture changes in a PostgreSQL database that is running in Amazon RDS. To do this:
-
Set the instance parameter
rds.logical_replication
to1
. -
Verify that the
wal_level
parameter is set tological
by running the querySHOW wal_level
as the database RDS master user. This might not be the case in multi-zone replication setups. You cannot set this option manually. It is automatically changed when therds.logical_replication
parameter is set to1
. If thewal_level
is not set tological
after you make the preceding change, it is probably because the instance has to be restarted after the parameter group change. Restarts occur during your maintenance window, or you can initiate a restart manually. -
Set the Debezium
plugin.name
parameter topgoutput
. -
Initiate logical replication from an AWS account that has the
rds_replication
role. The role grants permissions to manage logical slots and to stream data using logical slots. By default, only the master user account on AWS has therds_replication
role on Amazon RDS. To enable a user account other than the master account to initiate logical replication, you must grant the account therds_replication
role. For example,grant rds_replication to <my_user>
. You must havesuperuser
access to grant therds_replication
role to a user. To enable accounts other than the master account to create an initial snapshot, you must grantSELECT
permission to the accounts on the tables to be captured. For more information about security for PostgreSQL logical replication, see the PostgreSQL documentation.
PostgreSQL on Azure
It is possible to use Debezium with Azure Database for PostgreSQL, which has support for the pgoutput
logical decoding plug-in, which is supported by Debezium.
Set the Azure replication support to logical
. You can use the Azure CLI or the Azure Portal to configure this. For example, to use the Azure CLI, here are the az postgres server
commands that you need to execute:
az postgres server configuration set --resource-group mygroup --server-name myserver --name azure.replication_support --value logical
az postgres server restart --resource-group mygroup --name myserver
PostgreSQL on CrunchyBridge
It is possible to use Debezium with CrunchyBridge; logical replication is already turned on. The pgoutput
plugin is available. You will have to create a replication user and provide correct privileges.
While using the |
Installing the logical decoding output plug-in
For more detailed instructions about setting up and testing logical decoding plug-ins, see Logical Decoding Output Plug-in Installation for PostgreSQL . |
As of PostgreSQL 9.4, the only way to read changes to the write-ahead-log is to install a logical decoding output plug-in. Plug-ins are written in C, compiled, and installed on the machine that runs the PostgreSQL server. Plug-ins use a number of PostgreSQL specific APIs, as described by the PostgreSQL documentation.
The PostgreSQL connector works with one of Debezium’s supported logical decoding plug-ins to receive change events from the database in either the Protobuf format or the pgoutput format.
The pgoutput
plugin comes out-of-the-box with the PostgreSQL database.
For more details on using Protobuf via the decoderbufs
plug-in, see the plug-in documentation
which discusses its requirements, limitations, and how to compile it.
For simplicity, Debezium also provides a container image based on the upstream PostgreSQL server image, on top of which it compiles and installs the plug-ins. You can use this image as an example of the detailed steps required for the installation.
The Debezium logical decoding plug-ins have been installed and tested on only Linux machines. For Windows and other operating systems, different installation steps might be required. |
Plug-in differences
Plug-in behavior is not completely the same for all cases. These differences have been identified:
-
While all plug-ins will refresh schema metadata from the database upon detection of a schema change during streaming, the
pgoutput
plug-in is somewhat more 'eager' about triggering such refreshes. For example, a change to the default value for a column will trigger a refresh withpgoutput
, while other plug-ins will not be aware of this change until another change triggers a refresh (eg. addition of a new column.) This is due to the behaviour ofpgoutput
, rather than Debezium itself.
All up-to-date differences are tracked in a test suite Java class.
Configuring the PostgreSQL server
If you are using a logical decoding plug-in other than pgoutput, after installing it, configure the PostgreSQL server as follows:
-
To load the plug-in at startup, add the following to the
postgresql.conf
file::# MODULES shared_preload_libraries = 'decoderbufs' (1)
1 Instructs the server to load the decoderbufs
logical decoding plug-ins at startup (the name of the plug-in is set in theProtobuf
make file). -
To configure the replication slot regardless of the decoder being used, specify the following in the
postgresql.conf
file:# REPLICATION wal_level = logical (1)
1 Instructs the server to use logical decoding with the write-ahead log.
Depending on your requirements, you may have to set other PostgreSQL streaming replication parameters when using Debezium.
Examples include max_wal_senders
and max_replication_slots
for increasing the number of connectors that can access the sending server concurrently, and wal_keep_size
for limiting the maximum WAL size which a replication slot will retain.
For more information about configuring streaming replication, see the PostgreSQL documentation.
Debezium uses PostgreSQL’s logical decoding, which uses replication slots. Replication slots are guaranteed to retain all WAL segments required for Debezium even during Debezium outages. For this reason, it is important to closely monitor replication slots to avoid too much disk consumption and other conditions that can happen such as catalog bloat if a replication slot stays unused for too long. For more information, see the PostgreSQL streaming replication documentation.
When connecting to a Postgres primary (i.e. not a read replica) and the version of Postgres is 17 or later, Debezium will create a replication slot enabled for failover. This means Debezium can continue to read changes from a replica promoted to primary in case of a failure, without missing any events. This requires the state of the slot to be synchronized from primary to replica server; refer to the Postgres documentation for the details.
If you are working with a synchronous_commit
setting other than on
,
the recommendation is to set wal_writer_delay
to a value such as 10 milliseconds to achieve a low latency of change events.
Otherwise, its default value is applied, which adds a latency of about 200 milliseconds.
Reading and understanding PostgreSQL documentation about the mechanics and configuration of the PostgreSQL write-ahead log is strongly recommended. |
Setting up permissions
Setting up a PostgreSQL server to run a Debezium connector requires a database user that can perform replications. Replication can be performed only by a database user that has appropriate permissions and only for a configured number of hosts.
Although, by default, superusers have the necessary REPLICATION
and LOGIN
roles, as mentioned in Security, it is best not to provide the Debezium replication user with elevated privileges.
Instead, create a Debezium user that has the minimum required privileges.
-
PostgreSQL administrative permissions.
-
To provide a user with replication permissions, define a PostgreSQL role that has at least the
REPLICATION
andLOGIN
permissions, and then grant that role to the user. For example:CREATE ROLE <name> REPLICATION LOGIN;
Setting privileges to enable Debezium to create PostgreSQL publications when you use pgoutput
If you use pgoutput
as the logical decoding plugin, Debezium must operate in the database as a user with specific privileges.
Debezium streams change events for PostgreSQL source tables from publications that are created for the tables. Publications contain a filtered set of change events that are generated from one or more tables. The data in each publication is filtered based on the publication specification. The specification can be created by the PostgreSQL database administrator or by the Debezium connector. To permit the Debezium PostgreSQL connector to create publications and specify the data to replicate to them, the connector must operate with specific privileges in the database.
There are several options for determining how publications are created. In general, it is best to manually create publications for the tables that you want to capture, before you set up the connector. However, you can configure your environment in a way that permits Debezium to create publications automatically, and to specify the data that is added to them.
Debezium uses include list and exclude list properties to specify how data is inserted in the publication.
For more information about the options for enabling Debezium to create publications, see publication.autocreate.mode
.
For Debezium to create a PostgreSQL publication, it must run as a user that has the following privileges:
-
Replication privileges in the database to add the table to a publication.
-
CREATE
privileges on the database to add publications. -
SELECT
privileges on the tables to copy the initial table data. Table owners automatically haveSELECT
permission for the table.
To add tables to a publication, the user must be an owner of the table. But because the source table already exists, you need a mechanism to share ownership with the original owner. To enable shared ownership, you create a PostgreSQL replication group, and then add the existing table owner and the replication user to the group.
-
Create a replication group.
CREATE ROLE <replication_group>;
-
Add the original owner of the table to the group.
GRANT REPLICATION_GROUP TO <original_owner>;
-
Add the Debezium replication user to the group.
GRANT REPLICATION_GROUP TO <replication_user>;
-
Transfer ownership of the table to
<replication_group>
.ALTER TABLE <table_name> OWNER TO REPLICATION_GROUP;
For Debezium to specify the capture configuration, the value of publication.autocreate.mode
must be set to filtered
.
Configuring PostgreSQL to allow replication with the Debezium connector host
To enable Debezium to replicate PostgreSQL data, you must configure the database to permit replication with the host that runs the PostgreSQL connector.
To specify the clients that are permitted to replicate with the database, add entries to the PostgreSQL host-based authentication file, pg_hba.conf
.
For more information about the pg_hba.conf
file, see the PostgreSQL documentation.
-
Add entries to the
pg_hba.conf
file to specify the Debezium connector hosts that can replicate with the database host. For example,pg_hba.conf
file example:local replication <youruser> trust (1) host replication <youruser> 127.0.0.1/32 trust (2) host replication <youruser> ::1/128 trust (3)
Table 27. Descriptions of pg_hba.conf
settingsItem Description 1
Instructs the server to allow replication for
<youruser>
locally, that is, on the server machine.2
Instructs the server to allow
<youruser>
onlocalhost
to receive replication changes usingIPV4
.3
Instructs the server to allow
<youruser>
onlocalhost
to receive replication changes usingIPV6
.
For more information about network masks, see the PostgreSQL documentation. |
Supported PostgreSQL topologies
The PostgreSQL connector can be used with a standalone PostgreSQL server or with a cluster of PostgreSQL servers.
PostgreSQL (for all versions ⇐ 16) supports logical replication slots on only primary
servers. This means that a replica in a PostgreSQL cluster cannot be configured for logical replication, and consequently that the Debezium PostgreSQL connector can connect and communicate with only the primary server. Should this server fail, the connector stops. When the cluster is repaired, if the original primary server is once again promoted to primary
, you can restart the connector. However, if a different PostgreSQL server with the plug-in and proper configuration is promoted to primary
, you must change the connector configuration to point to the new primary
server and then you can restart the connector.
When using PostgreSQL 16 or newer, you can set up logical replication slots also on replica servers, and Debezium does not need to connect to your primary server for ingesting change events. In comparison to connecting to the primary, connecting to a replica will yield slightly increased latencies, though.
When using PostgreSQL 17 or newer, you can set up logical replication slots on primary serves which are enabled for failover. The state of such a failover slot can automatically be propagated to one or more replica servers, allowing Debezium to continue to ingest changes from a replica promoted to primary after a failure, without missing any events.
WAL disk space consumption
In certain cases, it is possible for PostgreSQL disk space consumed by WAL files to spike or increase out of usual proportions. There are several possible reasons for this situation:
-
The LSN up to which the connector has received data is available in the
confirmed_flush_lsn
column of the server’spg_replication_slots
view. Data that is older than this LSN is no longer available, and the database is responsible for reclaiming the disk space.Also in the
pg_replication_slots
view, therestart_lsn
column contains the LSN of the oldest WAL that the connector might require. If the value forconfirmed_flush_lsn
is regularly increasing and the value ofrestart_lsn
lags then the database needs to reclaim the space.The database typically reclaims disk space in batch blocks. This is expected behavior and no action by a user is necessary.
-
There are many updates in a database that is being tracked but only a tiny number of updates are related to the table(s) and schema(s) for which the connector is capturing changes. This situation can be easily solved with periodic heartbeat events. Set the
heartbeat.interval.ms
connector configuration property.For the connector to detect and process events from a heartbeat table, you must add the table to the PostgreSQL publication specified by the publication.name property. If this publication predates your Debezium deployment, the connector uses the publications as defined. If the publication is not already configured to automatically replicate changes
FOR ALL TABLES
in the database, you must explicitly add the heartbeat table to the publication, for example,ALTER PUBLICATION <publicationName> ADD TABLE <heartbeatTableName>;
-
The PostgreSQL instance contains multiple databases and one of them is a high-traffic database. Debezium captures changes in another database that is low-traffic in comparison to the other database. Debezium then cannot confirm the LSN as replication slots work per-database and Debezium is not invoked. As WAL is shared by all databases, the amount used tends to grow until an event is emitted by the database for which Debezium is capturing changes. To overcome this, it is necessary to:
-
Enable periodic heartbeat record generation with the
heartbeat.interval.ms
connector configuration property. -
Regularly emit change events from the database for which Debezium is capturing changes.
A separate process would then periodically update the table by either inserting a new row or repeatedly updating the same row. PostgreSQL then invokes Debezium, which confirms the latest LSN and allows the database to reclaim the WAL space. This task can be automated by means of the
heartbeat.action.query
connector configuration property. -
For users on AWS RDS with PostgreSQL, a situation similar to the high traffic/low traffic scenario can occur in an idle environment. AWS RDS causes writes to its own system tables to be invisible to clients on a frequent basis (5 minutes). Again, regularly emitting events solves the problem. |
Setting up multiple connectors for same database server
Debezium uses replication slots to stream changes from a database. These replication slots maintain the current position in form of a LSN (Log Sequence Number) which is pointer to a location in the WAL being consumed by the Debezium connector. This helps PostgreSQL keep the WAL available until it is processed by Debezium. A single replication slot can exist only for a single consumer or process - as different consumer might have different state and may need data from different position.
Since a replication slot can only be used by a single connector, it is essential to create a unique replication slot for each Debezium connector. Although when a connector is not active, Postgres may allow other connector to consume the replication slot - which could be dangerous as it may lead to data loss as a slot will emit each change just once [See More].
In addition to replication slot, Debezium uses publication to stream events when using the pgoutput
plugin. Similar to replication slot, publication is at database level and is defined for a set of tables. Thus, you’ll need a unique publication for each connector, unless the connectors work on same set of tables. For more information about the options for enabling Debezium to create publications, see publication.autocreate.mode
See slot.name
and publication.name
on how to set a unique replication slot name and publication name for each connector.
Upgrading PostgreSQL
When you upgrade the PostgreSQL database that Debezium uses, you must take specific steps to protect against data loss and to ensure that Debezium continues to operate. In general, Debezium is resilient to interruptions caused by network failures and other outages. For example, when a database server that a connector monitors stops or crashes, after the connector re-establishes communication with the PostgreSQL server, it continues to read from the last position recorded by the log sequence number (LSN) offset. The connector retrieves information about the last recorded offset from the Kafka Connect offsets topic, and queries the configured PostgreSQL replication slot for a log sequence number (LSN) with the same value.
For the connector to start and to capture change events from a PostgreSQL database, a replication slot must be present. However, as part of the PostgreSQL upgrade process, replication slots are removed, and the original slots are not restored after the upgrade completes. As a result, when the connector restarts and requests the last known offset from the replication slot, PostgreSQL cannot return the information.
You can create a new replication slot, but you must do more than create a new slot to guard against data loss. A new replication slot can provide the LSNs only for changes the occur after you create the slot; it cannot provide the offsets for events that occurred before the upgrade. When the connector restarts, it first requests the last known offset from the Kafka offsets topic. It then sends a request to the replication slot to return information for the offset retrieved from the offsets topic. But the new replication slot cannot provide the information that the connector needs to resume streaming from the expected position. The connector then skips any existing change events in the log, and only resumes streaming from the most recent position in the log. This can lead to silent data loss: the connector emits no records for the skipped events, and it does not provide any information to indicate that events were skipped.
For guidance about how to perform a PostgreSQL database upgrade so that Debezium can continue to capture events while minimizing the risk of data loss, see the following procedure.
-
Temporarily stop applications that write to the database, or put them into a read-only mode.
-
Back up the database.
-
Temporarily disable write access to the database.
-
Verify that any changes that occurred in the database before you blocked write operations are saved to the write-ahead log (WAL), and that the WAL LSN is reflected on the replication slot.
-
Provide the connector with enough time to capture all event records that are written to the replication slot.
This step ensures that all change events that occurred before the downtime are accounted for, and that they are saved to Kafka. -
Verify that the connector has finished consuming entries from the replication slot by checking the value of the flushed LSN.
-
Shut down the connector gracefully by stopping Kafka Connect.
Kafka Connect stops the connectors, flushes all event records to Kafka, and records the last offset received from each connector.As an alternative to stopping the entire Kafka Connect cluster, you can stop the connector by deleting it. Do not remove the offset topic, because it might be shared by other Kafka connectors. Later, after you restore write access to the database and you are ready to restart the connector, you must recreate the connector.
-
As a PostgreSQL administrator, drop the replication slot on the primary database server. Do not use the
slot.drop.on.stop
property to drop the replication slot. This property is for testing only. -
Stop the database.
-
Perform the upgrade using an approved PostgreSQL upgrade procedure, such as
pg_upgrade
, orpg_dump
andpg_restore
. -
(Optional) Use a standard Kafka tool to remove the connector offsets from the offset storage topic.
For an example of how to remove connector offsets, see how to remove connector offsets in the Debezium community FAQ. -
Restart the database.
-
As a PostgreSQL administrator, create a Debezium logical replication slot on the database. You must create the slot before enabling writes to the database. Otherwise, Debezium cannot capture the changes, resulting in data loss.
-
Verify that the publication that defines the tables for Debezium to capture is still present after the upgrade. If the publication is not available, connect to the database as a PostgreSQL administrator to create a new publication.
-
If it was necessary to create a new publication in the previous step, update the Debezium connector configuration to add the name of the new publication to the
publication.name
property. -
In the connector configuration, rename the connector.
-
In the connector configuration, set
slot.name
to the name of the Debezium replication slot. -
Verify that the new replication slot is available.
-
Restore write access to the database and restart any applications that write to the database.
-
In the connector configuration, set the
snapshot.mode
property tonever
, and then restart the connector.If you were unable to verify that Debezium finished reading all database changes in Step 6, you can configure the connector to perform a new snapshot by setting
snapshot.mode=initial
. If necessary, you can confirm whether the connector read all changes from the replication slot by checking the contents of a database backup that was taken immediately before the upgrade.
Deployment
To deploy a Debezium PostgreSQL connector, you install the Debezium PostgreSQL connector archive, configure the connector, and start the connector by adding its configuration to Kafka Connect.
-
Zookeeper, Kafka, and Kafka Connect are installed.
-
PostgreSQL is installed and is set up to run the Debezium connector.
-
Download the Debezium PostgreSQL connector plug-in archive.
-
Extract the files into your Kafka Connect environment.
-
Add the directory with the JAR files to Kafka Connect’s
plugin.path
. -
Restart your Kafka Connect process to pick up the new JAR files.
If you are working with immutable containers, see Debezium’s Container images for Zookeeper, Kafka, PostgreSQL and Kafka Connect with the PostgreSQL connector already installed and ready to run. You can also run Debezium on Kubernetes and OpenShift.
Connector configuration example
Following is an example of the configuration for a PostgreSQL connector that connects to a PostgreSQL server on port 5432 at 192.168.99.100, whose logical name is fulfillment
.
Typically, you configure the Debezium PostgreSQL connector in a JSON file by setting the configuration properties available for the connector.
You can choose to produce events for a subset of the schemas and tables in a database. Optionally, you can ignore, mask, or truncate columns that contain sensitive data, are larger than a specified size, or that you do not need.
{
"name": "fulfillment-connector", (1)
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", (2)
"database.hostname": "192.168.99.100", (3)
"database.port": "5432", (4)
"database.user": "postgres", (5)
"database.password": "postgres", (6)
"database.dbname" : "postgres", (7)
"topic.prefix": "fulfillment", (8)
"table.include.list": "public.inventory" (9)
}
}
1 | The name of the connector when registered with a Kafka Connect service. |
2 | The name of this PostgreSQL connector class. |
3 | The address of the PostgreSQL server. |
4 | The port number of the PostgreSQL server. |
5 | The name of the PostgreSQL user that has the required privileges. |
6 | The password for the PostgreSQL user that has the required privileges. |
7 | The name of the PostgreSQL database to connect to |
8 | The topic prefix for the PostgreSQL server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used. |
9 | A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring. |
See the complete list of PostgreSQL connector properties that can be specified in these configurations.
You can send this configuration with a POST
command to a running Kafka Connect service.
The service records the configuration and starts one connector task that performs the following actions:
-
Connects to the PostgreSQL database.
-
Reads the transaction log.
-
Streams change event records to Kafka topics.
Adding connector configuration
To run a Debezium PostgreSQL connector, create a connector configuration and add the configuration to your Kafka Connect cluster.
-
The logical decoding plug-in is installed.
-
The PostgreSQL connector is installed.
-
Create a configuration for the PostgreSQL connector.
-
Use the Kafka Connect REST API to add that connector configuration to your Kafka Connect cluster.
After the connector starts, it performs a consistent snapshot of the PostgreSQL server databases that the connector is configured for. The connector then starts generating data change events for row-level operations and streaming change event records to Kafka topics.
Connector properties
The Debezium PostgreSQL connector has many configuration properties that you can use to achieve the right connector behavior for your application. Many properties have default values. Information about the properties is organized as follows:
Required Debezium PostgreSQL connector configuration properties
The following configuration properties are required unless a default value is available.
Property | Default | Description | ||
---|---|---|---|---|
No default |
Unique name for the connector. Attempting to register again with the same name will fail. This property is required by all Kafka Connect connectors. |
|||
No default |
The name of the Java class for the connector. Always use a value of |
|||
|
The maximum number of tasks that should be created for this connector. The PostgreSQL connector always uses a single task and therefore does not use this value, so the default is always acceptable. |
|||
|
The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server. Supported values are |
|||
|
The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the Debezium connector that you are configuring. Slot names must conform to PostgreSQL replication slot naming rules, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character." |
|||
|
Whether or not to delete the logical replication slot when the connector stops in a graceful, expected way. The default behavior is that the replication slot remains configured for the connector when the connector stops. When the connector restarts, having the same replication slot enables the connector to start processing where it left off. Set to |
|||
|
Whether or not to create a failover slot. When not specified, or when not connecting to a Postgres 17+ primary, no failover slot will be created. When using failover slots, make sure to add the physical replication slot(s) used for updating read replicas in the cluster to the |
|||
|
The name of the PostgreSQL publication created for streaming changes when using This publication is created at start-up if it does not already exist and it includes all tables. Debezium then applies its own include/exclude list filtering, if configured, to limit the publication to change events for the specific tables of interest. The connector user must have superuser permissions to create this publication, so it is usually preferable to create the publication before starting the connector for the first time. If the publication already exists, either for all tables or configured with a subset of tables, Debezium uses the publication as it is defined. |
|||
No default |
IP address or hostname of the PostgreSQL database server. |
|||
|
Integer port number of the PostgreSQL database server. |
|||
No default |
Name of the PostgreSQL database user for connecting to the PostgreSQL database server. |
|||
No default |
Password to use when connecting to the PostgreSQL database server. |
|||
No default |
The name of the PostgreSQL database from which to stream the changes. |
|||
No default |
Topic prefix that provides a namespace for the particular PostgreSQL database server or cluster in which Debezium is capturing changes.
The prefix should be unique across all other connectors, since it is used as a topic name prefix for all Kafka topics that receive records from this connector.
Only alphanumeric characters, hyphens, dots and underscores must be used in the database server logical name.
|
|||
No default |
An optional, comma-separated list of regular expressions that match names of schemas for which you want to capture changes.
Any schema name not included in To match the name of a schema, Debezium applies the regular expression that you specify as an anchored regular expression.
That is, the specified expression is matched against the entire identifier for the schema; it does not match substrings that might be present in a schema name. |
|||
No default |
An optional, comma-separated list of regular expressions that match names of schemas for which you do not want to capture changes.
Any schema whose name is not included in To match the name of a schema, Debezium applies the regular expression that you specify as an anchored regular expression.
That is, the specified expression is matched against the entire identifier for the schema; it does not match substrings that might be present in a schema name. |
|||
No default |
An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you want to capture.
When this property is set, the connector captures changes only from the specified tables.
Each identifier is of the form schemaName.tableName.
By default, the connector captures changes in every non-system table in each schema whose changes are being captured. To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression.
That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name. |
|||
No default |
An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you do not want to capture.
Each identifier is of the form schemaName.tableName.
When this property is set, the connector captures changes from every table that you do not specify. To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression.
That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name. |
|||
No default |
An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in change event record values.
Fully-qualified names for columns are of the form schemaName.tableName.columnName. To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression.
That is, the expression is used to match the entire name string of the column; it does not match substrings that might be present in a column name. |
|||
No default |
An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event record values.
Fully-qualified names for columns are of the form schemaName.tableName.columnName. To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression.
That is, the expression is used to match the entire name string of the column; it does not match substrings that might be present in a column name. |
|||
|
Specifies whether to skip publishing messages when there is no change in included columns. This would essentially filter messages if there is no change in columns included as per
|
|||
|
Time, date, and timestamps can be represented with different kinds of precision: |
|||
|
Specifies how the connector should handle values for |
|||
|
Specifies how the connector should handle values for |
|||
|
Specifies how the connector should handle values for |
|||
|
Whether to use an encrypted connection to the PostgreSQL server. Options include: |
|||
No default |
The path to the file that contains the SSL certificate for the client. For more information, see the PostgreSQL documentation. |
|||
No default |
The path to the file that contains the SSL private key of the client. For more information, see the PostgreSQL documentation. |
|||
No default |
The password to access the client private key from the file specified by |
|||
No default |
The path to the file that contains the root certificate(s) against which the server is validated. For more information, see the PostgreSQL documentation. |
|||
No default |
A name of the class that creates SSL Sockets.
Use |
|||
|
Enable TCP keep-alive probe to verify that the database connection is still alive. For more information, see the PostgreSQL documentation. |
|||
|
Controls whether a delete event is followed by a tombstone event. |
|||
n/a |
An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns.
Set this property if you want to truncate the data in a set of columns when it exceeds the number of characters specified by the length in the property name.
Set The fully-qualified name of a column observes the following format: You can specify multiple properties with different lengths in a single configuration. |
|||
n/a |
An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns.
Set this property if you want the connector to mask the values for a set of columns, for example, if they contain sensitive data.
Set The fully-qualified name of a column observes the following format: schemaName.tableName.columnName. To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name. You can specify multiple properties with different lengths in a single configuration. |
|||
n/a |
An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns.
Fully-qualified names for columns are of the form <schemaName>.<tableName>.<columnName>. A pseudonym consists of the hashed value that results from applying the specified hashAlgorithm and salt.
Based on the hash function that is used, referential integrity is maintained, while column values are replaced with pseudonyms.
Supported hash functions are described in the MessageDigest section of the Java Cryptography Architecture Standard Algorithm Name Documentation. column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName If necessary, the pseudonym is automatically shortened to the length of the column.
The connector configuration can include multiple properties that specify different hash algorithms and salts. |
|||
n/a |
An optional, comma-separated list of regular expressions that match the fully-qualified names of columns for which you want the connector to emit extra parameters that represent column metadata. When this property is set, the connector adds the following fields to the schema of event records:
These parameters propagate a column’s original type name and length (for variable-width types), respectively. The fully-qualified name of a column observes one of the following formats: databaseName.tableName.columnName, or databaseName.schemaName.tableName.columnName. |
|||
n/a |
An optional, comma-separated list of regular expressions that specify the fully-qualified names of data types that are defined for columns in a database. When this property is set, for columns with matching data types, the connector emits event records that include the following extra fields in their schema:
These parameters propagate a column’s original type name and length (for variable-width types), respectively. The fully-qualified name of a column observes one of the following formats: databaseName.tableName.typeName, or databaseName.schemaName.tableName.typeName. For the list of PostgreSQL-specific data type names, see the PostgreSQL data type mappings. |
|||
empty string |
A list of expressions that specify the columns that the connector uses to form custom message keys for change event records that it publishes to the Kafka topics for specified tables. By default, Debezium uses the primary key column of a table as the message key for records that it emits.
In place of the default, or to specify a key for tables that lack a primary key, you can configure custom message keys based on one or more columns. Each fully-qualified table name is a regular expression in the following format: There is no limit to the number of columns that you use to create custom message keys. However, it’s best to use the minimum number that are required to specify a unique key.
|
|||
all_tables |
Specifies whether and how the connector creates a publication.
This setting applies only when the connector streams changes by using the
Specify one of the following values:
|
|||
empty string |
Set this property to apply specific replica identity settings to a subset of the tables that a connector captures, based on the table name. The replica identity values that the property sets overwrite the replica identity values that are set in the database. The property accepts a comma-separated list of key-value pairs. Each key is a regular expression that matches fully-qualified table names; the corresponding value specifies a replica identity type. For example:
Use the following format to specify the fully qualified table name: Set the replica identity to one of the following values:
Example:
schema1.*:FULL,schema2.table2:NOTHING,schema2.table3:INDEX idx_name
|
|||
bytes |
Specifies how binary (
|
|||
none |
Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Set one of the following values:
|
|||
none |
Specifies how field names should be adjusted for compatibility with the message converter used by the connector. Specify one of the following values:
For more information, see Avro naming. |
|||
|
Specifies how many decimal digits should be used when converting Postgres |
|||
No default |
An optional, comma-separated list of regular expressions that match the names of the logical decoding message prefixes that you want the connector to capture. By default, the connector captures all logical decoding messages. When this property is set, the connector captures only logical decoding message with the prefixes specified by the property. All other logical decoding messages are excluded. To match the name of a message prefix, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire message prefix string; the expression does not match substrings that might be present in a prefix. If you include this property in the configuration, do not also set the For information about the structure of message events and about their ordering semantics, see message events. |
|||
No default |
An optional, comma-separated list of regular expressions that match the names of the logical decoding message prefixes that you do not want the connector to capture.
When this property is set, the connector does not capture logical decoding messages that use the specified prefixes.
All other messages are captured. To match the name of a message prefix, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire message prefix string; the expression does not match substrings that might be present in a prefix. If you include this property in the configuration, do not also set For information about the structure of message events and about their ordering semantics, see message events. |
Advanced Debezium PostgreSQL connector configuration properties
The following advanced configuration properties have defaults that work in most situations and therefore rarely need to be specified in the connector’s configuration.
Property | Default | Description | ||
---|---|---|---|---|
No default |
Enumerates a comma-separated list of the symbolic names of the custom converter instances that the connector can use.
For example,
You must set the For each converter that you configure for a connector, you must also add a
For example, isbn.type: io.debezium.test.IsbnConverter If you want to further control the behavior of a configured converter, you can add one or more configuration parameters to pass values to the converter.
To associate any additional configuration parameter with a converter, prefix the parameter names with the symbolic name of the converter. isbn.schema.name: io.debezium.postgresql.type.Isbn |
|||
initial |
Specifies the criteria for performing a snapshot when the connector starts:
If there is a previously stored LSN in the Kafka offsets topic, the connector continues streaming changes from that position. If no LSN is stored, the connector starts streaming changes from the point in time when the PostgreSQL logical replication slot was created on the server. Use this snapshot mode only when you know all data of interest is still reflected in the WAL.
For more information, see the table of |
|||
false |
If the |
|||
false |
If the |
|||
false |
If the |
|||
false |
If the |
|||
false |
If the |
|||
No default |
When |
|||
|
Specifies how the connector holds locks on tables while performing a schema snapshot.
|
|||
No default |
When |
|||
|
Specifies how the connector queries data while performing a snapshot.
This setting enables you to manage snapshot content in a more flexible manner compared to using the |
|||
No default |
When |
|||
All tables specified in |
An optional, comma-separated list of regular expressions that match the fully-qualified names ( To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the table; it does not match substrings that might be present in a table name. |
|||
|
Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If the connector cannot acquire table locks in this time interval, the snapshot fails. How the connector performs snapshots provides details. |
|||
No default |
Specifies the table rows to include in a snapshot. Use the property if you want a snapshot to include only a subset of the rows in a table. This property affects snapshots only. It does not apply to events that the connector reads from the log. The property contains a comma-separated list of fully-qualified table names in the form From a "snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC" In the resulting snapshot, the connector includes only the records for which |
|||
|
Specifies how the connector should react to exceptions during processing of events: |
|||
|
Positive integer value that specifies the maximum size of each batch of events that the connector processes. |
|||
|
Positive integer value that specifies the maximum number of records that the blocking queue can hold.
When Debezium reads events streamed from the database, it places the events in the blocking queue before it writes them to Kafka.
The blocking queue can provide backpressure for reading change events from the database
in cases where the connector ingests messages faster than it can write them to Kafka, or when Kafka becomes unavailable.
Events that are held in the queue are disregarded when the connector periodically records offsets.
Always set the value of |
|||
|
A long integer value that specifies the maximum volume of the blocking queue in bytes.
By default, volume limits are not specified for the blocking queue.
To specify the number of bytes that the queue can consume, set this property to a positive long value. |
|||
|
Positive integer value that specifies the number of milliseconds the connector should wait for new change events to appear before it starts processing a batch of events. Defaults to 500 milliseconds. |
|||
|
Specifies connector behavior when the connector encounters a field whose data type is unknown. The default behavior is that the connector omits the field from the change event and logs a warning.
|
|||
No default |
A semicolon separated list of SQL statements that the connector executes when it establishes a JDBC connection to the database. To use a semicolon as a character and not as a delimiter, specify two consecutive semicolons, |
|||
|
Frequency for sending replication connection status updates to the server, given in milliseconds.
|
|||
|
Controls how frequently the connector sends heartbeat messages to a Kafka topic. The default behavior is that the connector does not send heartbeat messages. |
|||
No default |
Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. |
|||
|
Specify the conditions that trigger a refresh of the in-memory schema for a table. |
|||
No default |
An interval in milliseconds that the connector should wait before performing a snapshot when the connector starts. If you are starting multiple connectors in a cluster, this property is useful for avoiding snapshot interruptions, which might cause re-balancing of connectors. |
|||
0 |
Specifies the time, in milliseconds, that the connector delays the start of the streaming process after it completes a snapshot.
Setting a delay interval helps to prevent the connector from restarting snapshots in the event that a failure occurs immediately after the snapshot completes, but before the streaming process begins.
Set a delay value that is higher than the value of the |
|||
|
During a snapshot, the connector reads table content in batches of rows. This property specifies the maximum number of rows in a batch. |
|||
No default |
Semicolon separated list of parameters to pass to the configured logical decoding plug-in. For example, |
|||
|
If connecting to a replication slot fails, this is the maximum number of consecutive attempts to connect. |
|||
|
The number of milliseconds to wait between retry attempts when the connector fails to connect to a replication slot. |
|||
|
Specifies the constant that the connector provides to indicate that the original value is a toasted value that is not provided by the database.
If the setting of |
|||
|
Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify |
|||
|
Determines whether the connector should commit the LSN of the processed records in the source PostgreSQL database so that the WAL logs can be deleted.
Specify
|
|||
10000 (10 seconds) |
The number of milliseconds to wait before restarting a connector after a retriable error occurs. |
|||
|
A comma-separated list of the operation types that you want the connector to skip during streaming. You can configure the connector to skip the following types of operations:
Set the value to |
|||
No default value |
Fully-qualified name of the data collection that is used to send signals to the connector. |
|||
source |
List of the signaling channel names that are enabled for the connector. By default, the following channels are available:
|
|||
No default |
List of notification channel names that are enabled for the connector. By default, the following channels are available:
|
|||
1024 |
The maximum number of rows that the connector fetches and reads into memory during an incremental snapshot chunk. Increasing the chunk size provides greater efficiency, because the snapshot runs fewer snapshot queries of a greater size. However, larger chunk sizes also require more memory to buffer the snapshot data. Adjust the chunk size to a value that provides the best performance in your environment. |
|||
|
Specifies the watermarking mechanism that the connector uses during an incremental snapshot to deduplicate events that might be captured by an incremental snapshot and then recaptured after streaming resumes.
|
|||
|
Specifies whether a connector writes watermarks to the signal data collection to track the progress of an incremental snapshot.
Set the value to |
|||
|
How often, in milliseconds, the XMIN will be read from the replication slot.
The XMIN value provides the lower bounds of where a new replication slot could start from.
The default value of |
|||
|
The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event etc., defaults to |
|||
|
Specify the delimiter for topic name, defaults to |
|||
|
The size used for holding the topic names in bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection. |
|||
|
Controls the name of the topic to which the connector sends heartbeat messages. The topic name has this pattern: |
|||
|
Controls the name of the topic to which the connector sends transaction metadata messages. The topic name has this pattern: |
|||
|
Specifies the number of threads that the connector uses when performing an initial snapshot. To enable parallel initial snapshots, set the property to a value greater than 1. In a parallel initial snapshot, the connector processes multiple tables concurrently. This feature is incubating. |
|||
|
Defines tags that customize MBean object names by adding metadata that provides contextual information.
Specify a comma-separated list of key-value pairs.
Each key represents a tag for the MBean object name, and the corresponding value represents a value for the key, for example, The connector appends the specified tags to the base MBean object name. Tags can help you to organize and categorize metrics data. You can define tags to identify particular application instances, environments, regions, versions, and so forth. For more information, see Customized MBean names. |
|||
|
Specifies how the connector responds after an operation that results in a retriable error, such as a connection error.
|
|||
|
Specifies the time, in milliseconds, that the connector waits for a query to complete.
Set the value to |
Pass-through PostgreSQL connector configuration properties
The connector supports pass-through properties that enable Debezium to specify custom configuration options for fine-tuning the behavior of the Apache Kafka producer and consumer. For information about the full range of configuration properties for Kafka producers and consumers, see the Kafka documentation.
Pass-through properties for configuring how the PostgreSQL connector interacts with the Kafka signaling topic
Debezium provides a set of signal.*
properties that control how the connector interacts with the Kafka signals topic.
The following table describes the Kafka signal
properties.
Property | Default | Description | ||
---|---|---|---|---|
<topic.prefix>-signal |
The name of the Kafka topic that the connector monitors for ad hoc signals.
|
|||
kafka-signal |
The name of the group ID that is used by Kafka consumers. |
|||
No default |
A list of the host and port pairs that the connector uses to establish its initial connection to the Kafka cluster. Each pair references the Kafka cluster that is used by the Debezium Kafka Connect process. |
|||
|
An integer value that specifies the maximum number of milliseconds that the connector waits when polling signals. |
Pass-through properties for configuring the Kafka consumer client for the signaling channel
The Debezium connector provides for pass-through configuration of the signals Kafka consumer.
Pass-through signals properties begin with the prefix signals.consumer.*
.
For example, the connector passes properties such as signal.consumer.security.protocol=SSL
to the Kafka consumer.
Debezium strips the prefixes from the properties before it passes the properties to the Kafka signals consumer.
Pass-through properties for configuring the PostgreSQL connector sink notification channel
The following table describes properties that you can use to configure the Debezium sink notification
channel.
Property | Default | Description |
---|---|---|
No default |
The name of the topic that receives notifications from Debezium.
This property is required when you configure the |
Debezium connector pass-through database driver configuration properties
The Debezium connector provides for pass-through configuration of the database driver.
Pass-through database properties begin with the prefix driver.*
.
For example, the connector passes properties such as driver.foobar=false
to the JDBC URL.
Debezium strips the prefixes from the properties before it passes the properties to the database driver.
Monitoring
The Debezium PostgreSQL connector provides two types of metrics that are in addition to the built-in support for JMX metrics that Zookeeper, Kafka, and Kafka Connect provide.
-
Snapshot metrics provide information about connector operation while performing a snapshot.
-
Streaming metrics provide information about connector operation when the connector is capturing changes and streaming change event records.
Debezium monitoring documentation provides details for how to expose these metrics by using JMX.
/ Type: concept === Customized MBean names
Debezium connectors expose metrics via the MBean name for the connector. These metrics, which are specific to each connector instance, provide data about the behavior of the connector’s snapshot, streaming, and schema history processes.
By default, when you deploy a correctly configured connector, Debezium generates a unique MBean name for each of the different connector metrics. To view the metrics for a connector process, you configure your observability stack to monitor its MBean. But these default MBean names depend on the connector configuration; configuration changes can result in changes to the MBean names. A change to the MBean name breaks the linkage between the connector instance and the MBean, disrupting monitoring activity. In this scenario, you must reconfigure the observability stack to use the new MBean name if you want to resume monitoring.
To prevent monitoring disruptions that result from MBean name changes, you can configure custom metrics tags.
You configure custom metrics by adding the custom.metric.tags
property to the connector configuration.
The property accepts key-value pairs in which each key represents a tag for the MBean object name, and the corresponding value represents the value of that tag.
For example: k1=v1,k2=v2
.
Debezium appends the specified tags to the MBean name of the connector.
After you configure the custom.metric.tags
property for a connector, you can configure the observability stack to retrieve metrics associated with the specified tags.
The observability stack then uses the specified tags, rather than the mutable MBean names to uniquely identify connectors.
Later, if Debezium redefines how it constructs MBean names, or if the topic.prefix
in the connector configuration changes, metrics collection is uninterrupted,
because the metrics scrape task uses the specified tag patterns to identify the connector.
A further benefit of using custom tags, is that you can use tags that reflect the architecture of your data pipeline, so that metrics are organized in a way that suits you operational needs.
For example, you might specify tags with values that declare the type of connector activity, the application context, or the data source, for example, db1-streaming-for-application-abc
.
If you specify multiple key-value pairs, all of the specified pairs are appended to the connector’s MBean name.
The following example illustrates how tags modify the default MBean name.
By default, the PostgreSQL connector uses the following MBean name for streaming metrics:
debezium.postgresql:type=connector-metrics,context=streaming,server=<topic.prefix>
If you set the value of custom.metric.tags
to database=salesdb-streaming,table=inventory
, Debezium generates the following custom MBean name:
debezium.postgresql:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
Snapshot metrics
The MBean is debezium.postgres:type=connector-metrics,context=snapshot,server=<topic.prefix>
.
Snapshot metrics are not exposed unless a snapshot operation is active, or if a snapshot has occurred since the last connector start.
The following table lists the snapshot metrics that are available.
Attributes | Type | Description |
---|---|---|
|
The last snapshot event that the connector has read. |
|
|
The number of milliseconds since the connector has read and processed the most recent event. |
|
|
The total number of events that this connector has seen since last started or reset. |
|
|
The number of events that have been filtered by include/exclude list filtering rules configured on the connector. |
|
|
The list of tables that are captured by the connector. |
|
|
The length the queue used to pass events between the snapshotter and the main Kafka Connect loop. |
|
|
The free capacity of the queue used to pass events between the snapshotter and the main Kafka Connect loop. |
|
|
The total number of tables that are being included in the snapshot. |
|
|
The number of tables that the snapshot has yet to copy. |
|
|
Whether the snapshot was started. |
|
|
Whether the snapshot was paused. |
|
|
Whether the snapshot was aborted. |
|
|
Whether the snapshot completed. |
|
|
The total number of seconds that the snapshot has taken so far, even if not complete. Includes also time when snapshot was paused. |
|
|
The total number of seconds that the snapshot was paused. If the snapshot was paused several times, the paused time adds up. |
|
|
Map containing the number of rows scanned for each table in the snapshot. Tables are incrementally added to the Map during processing. Updates every 10,000 rows scanned and upon completing a table. |
|
|
The maximum buffer of the queue in bytes. This metric is available if |
|
|
The current volume, in bytes, of records in the queue. |
The connector also provides the following additional snapshot metrics when an incremental snapshot is executed:
Attributes | Type | Description |
---|---|---|
|
The identifier of the current snapshot chunk. |
|
|
The lower bound of the primary key set defining the current chunk. |
|
|
The upper bound of the primary key set defining the current chunk. |
|
|
The lower bound of the primary key set of the currently snapshotted table. |
|
|
The upper bound of the primary key set of the currently snapshotted table. |
Streaming metrics
The MBean is debezium.postgres:type=connector-metrics,context=streaming,server=<topic.prefix>
.
The following table lists the streaming metrics that are available.
Attributes | Type | Description |
---|---|---|
|
The last streaming event that the connector has read. |
|
|
The number of milliseconds since the connector has read and processed the most recent event. |
|
|
The total number of data change events reported by the source database since the last connector start, or since a metrics reset. Represents the data change workload for Debezium to process. |
|
|
The total number of create events processed by the connector since its last start or metrics reset. |
|
|
The total number of update events processed by the connector since its last start or metrics reset. |
|
|
The total number of delete events processed by the connector since its last start or metrics reset. |
|
|
The number of events that have been filtered by include/exclude list filtering rules configured on the connector. |
|
|
The list of tables that are captured by the connector. |
|
|
The length the queue used to pass events between the streamer and the main Kafka Connect loop. |
|
|
The free capacity of the queue used to pass events between the streamer and the main Kafka Connect loop. |
|
|
Flag that denotes whether the connector is currently connected to the database server. |
|
|
The number of milliseconds between the last change event’s timestamp and the connector processing it. The values will incorporate any differences between the clocks on the machines where the database server and the connector are running. |
|
|
The number of processed transactions that were committed. |
|
|
The coordinates of the last received event. |
|
|
Transaction identifier of the last processed transaction. |
|
|
The maximum buffer of the queue in bytes. This metric is available if |
|
|
The current volume, in bytes, of records in the queue. |
Behavior when things go wrong
Debezium is a distributed system that captures all changes in multiple upstream databases; it never misses or loses an event. When the system is operating normally or being managed carefully then Debezium provides exactly once delivery of every change event record. If a fault does happen then the system does not lose any events. However, while it is recovering from the fault, it’s possible that the connector might emit some duplicate change events. In these abnormal situations, Debezium, like Kafka, provides at least once delivery of change events.
The rest of this section describes how Debezium handles various kinds of faults and problems.
Configuration and startup errors
In the following situations, the connector fails when trying to start, reports an error/exception in the log, and stops running:
-
The connector’s configuration is invalid.
-
The connector cannot successfully connect to PostgreSQL by using the specified connection parameters.
-
The connector is restarting from a previously-recorded position in the PostgreSQL WAL (by using the LSN) and PostgreSQL no longer has that history available.
In these cases, the error message has details about the problem and possibly a suggested workaround. After you correct the configuration or address the PostgreSQL problem, restart the connector.
PostgreSQL becomes unavailable
When the connector is running, the PostgreSQL server that it is connected to could become unavailable for any number of reasons. If this happens, the connector fails with an error and stops. When the server is available again, restart the connector.
The PostgreSQL connector externally stores the last processed offset in the form of a PostgreSQL LSN. After a connector restarts and connects to a server instance, the connector communicates with the server to continue streaming from that particular offset. This offset is available as long as the Debezium replication slot remains intact. Never drop a replication slot on the primary server or you will lose data. For information about failure cases in which a slot has been removed, see the next section.
Cluster failures
Prior to version 16, PostgreSQL allows logical replication slots only on primary servers. This means that you can point a Debezium PostgreSQL connector to only the active primary server of a database cluster. Also, replication slots themselves are not propagated to replicas. If the primary server goes down, a new primary must be promoted.
When using version 16 or newer, you can create logical replication slots also on replicas, but you need to take care of keeping them in sync with the corresponding slot on the primary server yourself.
When using version 17 or newer, replication slots on a primary can be enabled for automatic failover. In this case Postgres itself can take care of synchronizing a slot from primary to replica automatically, allowing Debezium to continue reading from that slot after promoting a replica to new primary, without missing any change events.
Some managed PostgresSQL services (AWS RDS and GCP CloudSQL for example) implement replication to a standby via disk replication. This means that the replication slot does get replicated and will remain available after a failover. |
The new primary must have the logical decoding plug-in installed and a replication slot that is configured for use by the plug-in and the database for which you want to capture changes. Only then can you point the connector to the new server and restart the connector.
When not using failover replication slots, as supported by Postgres 17 and newer, there are important caveats when failovers occur and you should pause Debezium until you can verify that you have an intact replication slot that has not lost data. After a failover:
-
There must be a process that re-creates the Debezium replication slot before allowing the application to write to the new primary. This is crucial. Without this process, your application can miss change events.
-
You might need to verify that Debezium was able to read all changes in the slot before the old primary failed.
One reliable method of recovering and verifying whether any changes were lost is to recover a backup of the failed primary to the point immediately before it failed. While this can be administratively difficult, it allows you to inspect the replication slot for any unconsumed changes.
Kafka Connect process stops gracefully
Suppose that Kafka Connect is being run in distributed mode and a Kafka Connect process is stopped gracefully. Prior to shutting down that process, Kafka Connect migrates the process’s connector tasks to another Kafka Connect process in that group. The new connector tasks start processing exactly where the prior tasks stopped. There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.
Kafka Connect process crashes
If the Kafka Connector process stops unexpectedly, any connector tasks it was running terminate without recording their most recently processed offsets. When Kafka Connect is being run in distributed mode, Kafka Connect restarts those connector tasks on other processes. However, PostgreSQL connectors resume from the last offset that was recorded by the earlier processes. This means that the new replacement tasks might generate some of the same change events that were processed just prior to the crash. The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.
Because there is a chance that some events might be duplicated during a recovery from failure, consumers should always anticipate some duplicate events. Debezium changes are idempotent, so a sequence of events always results in the same state.
In each change event record, Debezium connectors insert source-specific information about the origin of the event, including the PostgreSQL server’s time of the event, the ID of the server transaction, and the position in the write-ahead log where the transaction changes were written. Consumers can keep track of this information, especially the LSN, to determine whether an event is a duplicate.
Kafka becomes unavailable
As the connector generates change events, the Kafka Connect framework records those events in Kafka by using the Kafka producer API. Periodically, at a frequency that you specify in the Kafka Connect configuration, Kafka Connect records the latest offset that appears in those change events. If the Kafka brokers become unavailable, the Kafka Connect process that is running the connectors repeatedly tries to reconnect to the Kafka brokers. In other words, the connector tasks pause until a connection can be re-established, at which point the connectors resume exactly where they left off.
Connector is stopped for a duration
If the connector is gracefully stopped, the database can continue to be used. Any changes are recorded in the PostgreSQL WAL. When the connector restarts, it resumes streaming changes where it left off. That is, it generates change event records for all database changes that were made while the connector was stopped.
A properly configured Kafka cluster is able to handle massive throughput. Kafka Connect is written according to Kafka best practices, and given enough resources a Kafka Connect connector can also handle very large numbers of database change events. Because of this, after being stopped for a while, when a Debezium connector restarts, it is very likely to catch up with the database changes that were made while it was stopped. How quickly this happens depends on the capabilities and performance of Kafka and the volume of changes being made to the data in PostgreSQL.