Debezium Connector for PostgreSQL

Debezium’s PostgreSQL Connector can monitor and record the row-level changes in the schemas of a PostgreSQL database. This connector was added in Debezium 0.4.0.

The first time it connects to a PostgreSQL server/cluster, it reads a consistent snapshot of all of the schemas. When that snapshot is complete, the connector continuously streams the changes that were committed to PostgreSQL 9.6 or later and generates corresponding insert, update and delete events. All of the events for each table are recorded in a separate Kafka topic, where they can be easily consumed by applications and services.

Overview

PostgreSQL’s logical decoding feature was first introduced in version 9.4 and is a mechanism which allows the extraction of the changes which were commited to the transaction log and the processing of these changes in a user-friendly manner via the help of an output plugin. This output plugin must be installed prior to running the PostgreSQL server and enabled together with a replication slot in order for clients to be able to consume the changes.

Debezium’s PostgreSQL connector contains two different parts which work together in order to be able to read and process server changes:

The connector then produces a change event for every row-level insert, update, and delete operation that was received, recording all the change events for each table in a separate Kafka topic. Your client applications read the Kafka topics that correspond to the database tables they’re interested in following, and react to every row-level event it sees in those topics.

PostgreSQL normally purges WAL segments after some period of time. This means that the connector won’t have the complete history of all changes that have been made to the database. Therefore, when the PostgreSQL connector first connects to a particular PostgreSQL database, it starts by performing a consistent snapshot of each of the database schemas. After the connector completes the snapshot, it continues streaming changes from the exact point at which the snapshot was made. This way, we start with a consistent view of all of the data, yet continue reading without having lost any of the changes made while the snapshot was taking place.

The connector is also tolerant of failures. As the connector reads changes and produces events, it records the position in the write-ahead log with each event. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart it simply continues reading the WAL where it last left off. This includes snapshots: if the snapshot was not completed when the connector is stopped, upon restart it will begin a new snapshot.

The connector’s functionality relies on PostgreSQL’s logical decoding feature. Since this is a relatively new feature, it has some limitations which are also reflected by the connector:

  1. Logical Decoding does not support DDL changes: this means that the connector is unable to report DDL change events back to consumers.

  2. Logical Decoding replication slots are only supported on master servers: this means that when there is a cluster of PostgreSQL servers, the connector can only run on the active master server. It cannot run on hot or warm standby replicas. If the master server fails or is demoted, the connector will stop. Once the master has recovered the connector can simply be restarted. If a different PostgreSQL server has been promoted to master, the connector configuration must be adjusted before the connector is restarted. Make sure you read more about how the connector behaves when things go wrong.

Setting up PostgreSQL

Before using the Debezium PostgreSQL connector to monitor the changes committed on a PostgreSQL server, first install the logical decoding plugin into the PostgreSQL server, enable a replication slot, and configure a user with sufficient privileges to perform the replication.

Note that if your database is hosted by a service such as Amazon RDS or Heroku Postgres you may be unable to install the plugin. If so, you’ll be unable to monitor your database with Debezium (support for RDS is planned).

Installing the logical decoding output plugin

As of PostgreSQL 9.4, the only way to read changes to the write-ahead-log is to first install a logical decoding output plugin. Plugins are written in C, compiled, and installed on the machine which runs the PostgreSQL server. Plugins use a number of PostgreSQL specific APIs, as described by the PostgreSQL documentation.

Debezium’s PostgreSQL connector works with Debezium’s logical decoding plugin to encodes the changes in Protobuf format. The on the plugin’s documentation describes the plugin’s requirements, limitations, and how to compile it.

For simplicity, Debezium also provides a Docker image based on a vanilla PostgreSQL server image on top of which it compiles and installs the plugin. We recommend using this image as an example of the detailed steps required for the installation.

The Debezium logical decoding plugin has only been installed and tested on Linux machines. For Windows and other OSes it may require different installation steps

Configuring the PostgreSQL server

Once the plugin has been installed, configure the server to load the plugin at startup and to define replication slots:

postgresql.conf

# MODULES
shared_preload_libraries = 'decoderbufs' (1)

# REPLICATION
wal_level = logical             (2)
max_wal_senders = 1             (3)
max_replication_slots = 1       (4)
1 tells the server that it should load at startup the decoderbufs logical decoding plugin (the name of the plugin is set in its Makefile)
2 tells the server that it should use logical decoding with the write-ahead log
3 tells the server that it should use a maximum of 1 separate processes for processing WAL changes
4 tells the server that it should allow a maximum of 1 replication slots to be created for streaming WAL changes

We strongly recommend reading and understanding the official documentation regarding the mechanics and configuration of the PostgreSQL write-ahead log

Setting up permissions

Replication can only be performed by a database user that has appropriate permissions and only for a configured number of hosts.

In order to give a user replication permissions, define a PostgreSQL role that has at least the REPLICATION and LOGIN permissions. For example:

CREATE ROLE name REPLICATION LOGIN;

Superusers have by default both of the above roles.

Finally, configure the PostgreSQL server to allow replication to take place between the server machine and the host on which the Debezium PostgreSQL connector is running:

pg_hba.conf

local   replication     <youruser>                          trust   (1)
host    replication     <youruser>  127.0.0.1/32            trust   (2)
host    replication     <youruser>  ::1/128                 trust   (3)
1 tells the server to allow replication for <youruser> locally (i.e. on the server machine)
2 tells the server to allow <youruser> on localhost to receive replication changes using IPV4
3 tells the server to allow <youruser> on localhost to receive replication changes using IPV6

See the PostgreSQL documentation for more information on network masks.

Supported PostgreSQL topologies

The PostgreSQL connector can be used with a standalone PostgreSQL server or with a cluster of PostgreSQL servers.

As mentioned in the beginning, PostgreSQL 9.6 only supports logical replication slots on master servers. This means that a replica in a PostgreSQL cluster cannot be configured for logical replication, and consequently that the Debezium PostgreSQL Connector can only connect and communicate with the master server. Should this server fail, the connector will stop. When the cluster is repaired, if the original master server is once again promoted to master, the connector can simply be restarted. However, if a different PostgreSQL server with the plugin and proper configuration is promoted to master, the connector configuration must be changed to point to the new master server and then can be restarted.

Snapshots

Most PostgreSQL servers are configured to not retain the complete history of the database in the WAL segments, so the PostgreSQL connector would be unable to see the entire history of the database by simply reading the WAL. So, by default the connector will upon first startup perform an initial consistent snapshot of the database. Each snapshot consists of the following steps:

  1. Start a transaction with a SERIALIZABLE, READ ONLY, DEFERRABLE isolation level to ensure that all subsequent reads within this transaction are done against a single consistent version of the data. Any changes to the data due to subsequent INSERT, UPDATE, and DELETE operations by other clients will not be visible to this transaction.

  2. Obtain a SHARE UPDATE EXCLUSIVE MODE lock on each of the monitored tables to ensure that no structural changes can occur to any of the tables while the snapshot is taking place. Note that these locks do not prevent table INSERTS, UPDATES and DELETES from taking place during the operation.

  3. Read the current position in the server’s transaction log.

  4. Scan all of the database tables and schemas, and generate a READ event for each row and write that event to the appropriate table-specific Kafka topic.

  5. Commit the transaction.

  6. Record the successful completion of the snapshot in the connector offsets.

If the connector fails, is rebalanced, or stops after Step 1 begins but before Step 6 completes, upon restart the connector will begin a new snapshot. Once the connector does complete its initial snapshot, the PostgreSQL connector then continues streaming from the position read during step 3, ensuring that it does not miss any updates. If the connector stops again for any reason, upon restart it will simply continue streaming changes from where it previously left off. However, if the connector remains stopped for long enough, PostgreSQL might purge older WAL segments and the connector’s last known position may be lost. In this case, when the connector configured with initial snapshot mode (the default) is finally restarted, the PostgreSQL server will no longer have the starting point and the connector will not be able to relay the changes that are not available in the write ahead log.

A second snapshot mode allows the connector to perform snapshots always. This behavior tells the connector to always perform a snapshot when it starts up, and after the snapshot completes to continue streaming changes from step 3 in the above sequence. This mode can be used in cases when it’s known that some WAL segments have been deleted and are no longer available, or in case of a cluster failure after a new master has been promoted so that the connector doesn’t miss out on any potential changes that could’ve taken place after the new master had been promoted but before the connector was restarted on the new master.

The third snapshot mode instructs the connector to never performs snapshots. When a new connector is configured this way, if will either continue streaming changes from a previous stored offset or it will start from the point in time when the PostgreSQL logical replication slot was first created on the server. Note that this mode is useful only when you know all data of interest is still reflected in the WAL.

The final snapshot mode, initial only, will perform a database snapshot and then stop before streaming any other changes. If the connector had started but did not complete a snapshot before stopping, the connector will restart the snapshot process and stop once the snapshot completes.

Streaming changes

The PostgreSQL connector will typically spend the vast majority of its time streaming changes from the PostgreSQL server to which it is connected. This mechanism relies on PostgreSQL’s replication protocol where the client can receive changes from the server as they are committed in the server’s transaction log at certain positions (also known as Log Sequence Numbers or in short LSNs)

Whenever the server commits a transaction, a separate server process invokes a callback function from the logical decoding plugin. This function processes the changes from the transaction, converts them to a specific format (Protobuf in the case of Debezium plugin) and writes them on an output stream which can then be consumed by clients.

The PostgreSQL connector acts as a PostgreSQL client, and when it receives these changes it transforms the events into Debezium create, update, or delete events that include the LSN position of the event. The PostgreSQL connector forwards these change events to the Kafka Connect framework (running in the same process), which then asynchronously writes them in the same order to the appropriate Kafka topic. Kafka Connect uses the term offset for the source-specific position information that Debezium includes with each event, and Kafka Connect periodically records the most recent offset in another Kafka topic.

When Kafka Connect gracefully shuts down, it stops the connectors, flushes all events to Kafka, and records the last offset received from each connector. Upon restart, Kafka Connect reads the last recorded offset for each connector, and starts the connector from that point. The PostgreSQL connector uses the LSN recorded in each change event as the offset, so that upon restart the connector requests the PostgreSQL server send it the events starting just after that position.

Topics names

The PostgreSQL connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. The name of the Kafka topics takes by default the form serverName.schemaName.tableName, where serverName is the logical name of the connector as specified with the database.server.name configuration property, schemaName is the name of the database schema where the operation occurred, and tableName is the name of the database table on which the operation occurred.

For example, consider a PostgreSQL installation with a postgres database and an inventory schema that contains four tables: products, products_on_hand, customers, and orders. If the connector monitoring this database were given a logical server name of fulfillment, then the connector would produce events on these four Kafka topics:

  • fulfillment.inventory.products

  • fulfillment.inventory.products_on_hand

  • fulfillment.inventory.customers

  • fulfillment.inventory.orders

If on the other hand the tables were not part of a specific schema but rather created in the default public PostgreSQL schema, then the name of the Kafka topics would be:

  • fulfillment.public.products

  • fulfillment.public.products_on_hand

  • fulfillment.public.customers

  • fulfillment.public.orders

Meta information

Each record produced by the PostgreSQL connector has, in addition to the database event, some meta-information about where the event occurred on the server, the name of the source partition and the name of the Kafka topic and partition where the event should be placed:

"sourcePartition": {
     "server": "fulfillment"
 },
 "sourceOffset": {
     "lsn": "24023128",
     "txId": "555",
     "ts_usec": "1482918357011699"
 },
 "kafkaPartition": null

The PostgreSQL connector uses only 1 Kafka Connect partition and it places the generated events into 1 Kafka partition. Therefore, the name of the sourcePartition will always default to the name of the database.server.name configuration property, while the kafkaPartition has the value null which means that the connector does not use a specific Kafka partition.

The sourceOffset portion of the message contains information about the location of the server where the event occurred:

  • lsn represents the PostgreSQL log sequence number or offset in the transaction log

  • txId represents the identifier of the server transaction which caused the event

  • ts_usec represents the number of microseconds since Unix Epoch as the server time at which the transaction was committed

Events

All data change events produced by the PostgreSQL connector have a key and a value, although the structure of the key and value depend on the table from which the change events originated (see Topic names).

Starting with Kafka 0.10, Kafka can optionally record with the message key and value the timestamp at which the message was created (recorded by the producer) or written to the log by Kafka.

The Debezium PostgreSQL connector ensures that all Kafka Connect schema names are valid Avro schema names. This means that the logical server name must start with Latin letters or an underscore (e.g., [a-z,A-Z,_]), and the remaining characters in the logical server name and all characters in the schema and table names must be Latin letters, digits, or an underscore (e.g., [a-z,A-Z,0-9,\_]). If not, then all invalid characters will automatically be replaced with an underscore character.

This can lead to unexpected conflicts when the logical server name, schema names, and table names contain other characters, and the only distinguishing characters between table full names are invalid and thus replaced with underscores.

Debezium and Kafka Connect are designed around continuous streams of event messages, and the structure of these events may change over time. This could be difficult for consumers to deal with, so to make it easy Kafka Connect makes each event self-contained. Every message key and value has two parts: a schema and payload. The schema describes the structure of the payload, while the payload contains the actual data.

Change event’s key

For a given table, the change event’s key will have a structure that contains a field for each column in the primary key (or unique key constraint) of the table at the time the event was created.

Consider a customers table defined in the public database schema:

CREATE TABLE customers (
  id SERIAL,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL,
  PRIMARY KEY(id)
);

If the database.server.name configuration property has the value PostgreSQL_server, every change event for the customers table while it has this definition will feature the same key structure, which in JSON looks like this:

{
  "schema": {
    "type": "struct",
    "name": "PostgreSQL_server.public.customers.Key"
    "optional": false,
    "fields": [
          {
              "name": "id",
              "index": "0",
              "schema": {
                  "type": "INT32",
                  "optional": "false"
              }
          }
      ]
  },
  "payload": {
      "id": "1"
  },
}

The schema portion of the key contains a Kafka Connect schema describing what is in the key portion, and in our case that means that the payload value is not optional, is a structure defined by a schema named PostgreSQL_server.public.customers.Key, and has one required field named id of type int32. If we look at the value of the key’s payload field, we’ll see that it is indeed a structure (which in JSON is just an object) with a single id field, whose value is 1.

Therefore, we interpret this key as describing the row in the public.customers table (output from the connector named PostgreSQL_server) whose id primary key column had a value of 1.

Although the column.blacklist configuration property allows you to remove columns from the event values, all columns in a primary or unique key are always included in the event’s key.

If the table does not have a primary or unique key, then the change event’s key will be null. This makes sense since the rows in a table without a primary or unique key constraint cannot be uniquely identified.

Change event’s value

The value of the change event message is a bit more complicated. Like the key message, it has a schema section and payload section. The payload section of every change event value produced by the PostgreSQL connector has an envelope structure with the following fields:

  • op is a mandatory field that contains a string value describing the type of operation. Values for the PostgreSQL connector are c for create (or insert), u for update, d for delete, and r for read (in the case of a snapshot).

  • before is an optional field that if present contains the state of the row before the event occurred. The structure will be described by the PostgreSQL_server.public.customers.Value Kafka Connect schema, which the PostgreSQL_server connector uses for all rows in the public.customers table.

Whether or not this field is available is highly dependent on the REPLICA IDENTITY setting for each table

  • after is an optional field that if present contains the state of the row after the event occurred. The structure is describe by the same PostgreSQL_server.public.customers.Value Kafka Connect schema used in before.

  • source is a mandatory field that contains a structure describing the source metadata for the event, which in the case of PostgreSQL contains several fields: the connector name, whether the event is part of an ongoing snapshot or not and the same fields from the record’s meta information section

  • ts_ms is optional and if present contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.

And of course, the schema portion of the event message’s value contains a schema that describes this envelope structure and the nested fields within it.

Replica Identity

REPLICA IDENTITY is a PostgreSQL specific table-level setting which determines the amount of information that is available to logical decoding in case of UPDATE and DELETE events. More specifically, this controls what (if any) information is available regarding the previous values of the table columns involved, whenever one of the aforementioned events occur.

There are 4 possible values for REPLICA IDENTITY:

  • DEFAULT - UPDATE and DELETE events will only contain the previous values for the primary key columns of a table

  • NOTHING - UPDATE and DELETE events will not contain any information about the previous value on any of the table columns

  • FULL - UPDATE and DELETE events will contain the previous values of all the table’s columns

  • INDEX index name - UPDATE and DELETE events will contains the previous values of the columns contained in the index definition named index name

Create events

Let’s look at what a create event value might look like for our customers table:

{
    "schema": {
        "type": "struct",
        "optional": false,
        "name": "PostgreSQL_server.public.customers.Envelope",
        "version": 1,
        "fields": [
            {
                "field": "op",
                "type": "string",
                "optional": false
            },
            {
                "field": "before",
                "type": "struct",
                "optional": true,
                "name": "PostgreSQL_server.public.customers.Value",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    }
                ]
            },
            {
                "field": "after",
                "type": "struct",
                "name": "PostgreSQL_server.public.customers.Value",
                "optional": true,
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ]
            },
            {
                "field": "source",
                "type": "struct",
                "name": "io.debezium.connector.postgresql.Source",
                "optional": false,
                "fields": [
                    {
                        "name": "name",
                        "index": "0",
                        "schema": {
                            "type": "STRING",
                            "optional": "false"
                        }
                    },
                    {
                        "name": "ts_usec",
                        "index": "1",
                        "schema": {
                            "type": "INT64",
                            "optional": "true"
                        }
                    },
                    {
                        "name": "txId",
                        "index": "2",
                        "schema": {
                            "type": "INT32",
                            "optional": "true"
                        }
                    },
                    {
                        "name": "lsn",
                        "index": "3",
                        "schema": {
                            "type": "INT64",
                            "optional": "true"
                        }
                    },
                    {
                        "name": "snapshot",
                        "index": "4",
                        "schema": {
                            "type": "BOOLEAN",
                            "optional": "true"
                        }
                    },
                    {
                        "name": "last_snapshot_record",
                        "index": "5",
                        "schema": {
                            "type": "BOOLEAN",
                            "optional": "true"
                        }
                    }
                ]
            },
            {
                "field": "ts_ms",
                "type": "int64",
                "optional": true
            }
        ]
    },
    "payload": {
        "op": "c",
        "ts_ms": 1465491411815,
        "before": null,
        "after": {
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": {
            "name": "PostgreSQL_server",
            "ts_usec": "1482918357011699",
            "txId": "555",
            "lsn": "24023128",
            "snapshot": null,
            "last_snapshot_record": null
        }
    }
}

If we look at the schema portion of this event’s value, we can see the schema for the envelope, the schema for the source structure (which is specific to the PostgreSQL connector and reused across all events), and the table-specific schemas for the before and after fields.

The names of the schemas for the before and after fields are of the form logicalName.schemaName.tableName.Value, and thus are entirely independent from all other schemas for all other tables. This means that when using the Avro Converter, the resulting Avro schems for each table in each logical source have their own evolution and history.

If we look at the payload portion of this event’s value, we can see the information in the event, namely that it is describing that the row was created (since op=c), and that the after field value contains the values of the new inserted row’s' id, first_name, last_name, and email columns.

It may appear that the JSON representations of the events are much larger than the rows they describe. This is true, because the JSON representation must include the schema and the payload portions of the message. It is possible and even recommended to use the Avro Converter to dramatically decrease the size of the actual messages written to the Kafka topics.

Update events

The value of an update change event on this table will actually have the exact same schema, and its payload will be structured the same but will hold different values. Here’s an example:

Here’s that new event’s value formatted to be easier to read:

{
    "schema": { ... },
    "payload": {
        "before": {
            "id": 1
        },
        "after": {
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": {
            "name": "PostgreSQL_server",
            "ts_usec": "1482918357011699",
            "txId": "556",
            "lsn": "26523128",
            "snapshot": null,
            "last_snapshot_record": null
        },
        "op": "u",
        "ts_ms": 1465584025523
    }
}

When we compare this to the value in the insert event, we see a couple of differences in the payload section:

  • The op field value is now u, signifying that this row changed because of an update

  • The before field now has the state of the row with the values before the database commit, but only for the primary key column id. This is because the REPLICA IDENTITY which is by default DEFAULT.

Should we want to see the previous values of all the columns for the row, we would have to change the customers table first by running ALTER TABLE customers REPLICA IDENTITY FULL

  • The after field now has the updated state of the row, and here was can see that the first_name value is now Anne Marie.

  • The source field structure has the same fields as before, but the values are different since this event is from a different position in the WAL.

  • The ts_ms shows the timestamp that Debezium processed this event.

There are several things we can learn by just looking at this payload section. We can compare the before and after structures to determine what actually changed in this row because of the commit. The source structure tells us information about PostgreSQL’s record of this change (providing traceability), but more importantly this has information we can compare to other events in this and other topics to know whether this event occurred before, after, or as part of the same PostgreSQL commit as other events.

When the columns for a row’s primary/unique key are updated, the value of the row’s key has changed so Debezium will output three events: a DELETE event and tombstone event with the old key for the row, followed by an INSERT event with the new key for the row.

Delete events

So far we’ve seen samples of create and update events. Now, let’s look at the value of a delete event for the same table. Once again, the schema portion of the value will be exactly the same as with the create and update events:

{
    "schema": { ... },
    "payload": {
        "before": {
            "id": 1
        },
        "after": null,
        "source": {
            "name": "PostgreSQL_server",
            "ts_usec": "154918657011699",
            "txId": "557",
            "lsn": "46523128",
            "snapshot": null,
            "last_snapshot_record": null
        },
        "op": "d",
        "ts_ms": 1465581902461
    }
}

If we look at the payload portion, we see a number of differences compared with the create or update event payloads:

  • The op field value is now d, signifying that this row was deleted

  • The before field now has the state of the row that was deleted with the database commit. Again this only contains the primary key column due to the REPLICA IDENTITY setting

  • The after field is null, signifying that the row no longer exists

  • The source field structure has many of the same values as before, except the ts_usec, lsn and txId fields have changed

  • The ts_ms shows the timestamp that Debezium processed this event.

This event gives a consumer all kinds of information that it can use to process the removal of this row.

The PostgreSQL connector’s events are designed to work with Kafka log compaction, which allows for the removal of some older messages as long as at least the most recent message for every key is kept. This allows Kafka to reclaim storage space while ensuring the topic contains a complete dataset and can be used for reloading key-based state.

When a row is deleted, the delete event value listed above still works with log compaction, since Kafka can still remove all earlier messages with that same key. But only if the message value is null will Kafka know that it can remove all messages with that same key. To make this possible, Debezium’s PostgreSQL connector always follows the delete event with a special tombstone event that has the same key but null value.

As of Kafka 0.10, the JSON converter provided by Kafka Connect never results in a null value for the message (KAFKA-3832). Therefore, Kafka’s log compaction will always retain the last message, even when the tombstone event is supplied, though it will be free to remove all prior messages with the same key. In other words, until this is fixed using the JSON Converter will reduce the effectiveness of Kafka’s log compaction.

In the meantime, consider using the Avro Converter, which does properly return a null value and will thus take full advantage of Kafka log compaction.

Data types

As described above, the PostgreSQL connector represents the changes to rows with events that are structured like the table in which the row exist. The event contains a field for each column value, and how that value is represented in the event depends on the PostgreSQL data type of the column. This section describes this mapping.

The following table describes how the connector maps each of the PostgreSQL data types to a literal type and semantic type within the events' fields.

Here, the literal type describes how the value is literally represented using Kafka Connect schema types, namely INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP, and STRUCT.

The semantic type describes how the Kafka Connect schema captures the meaning of the field using the name of the Kafka Connect schema for the field.

PostgreSQL Data Type Literal type (schema type) Semantic type (schema name) Notes

POINT

STRUCT

io.debezium.data.geometry.Point

Contains a structure with 2 FLOAT64 fields - (x,y) - each representing the coordinates of a geometric point

BOOLEAN

BOOLEAN

n/a

BIT(1)

BOOLEAN

n/a

BIT( > 1), BIT VARYING[(M)]

BYTES

io.debezium.data.Bits

The length schema parameter contains an integer representing the number of bits. The resulting byte[] will contain the bits in little-endian form and will be sized to contain at least the specified number of bits (e.g., numBytes = n/8 + (n%8== 0 ? 0 : 1) where n is the number of bits).

SMALLINT, SMALLSERIAL

INT16

n/a

INTEGER, SERIAL

INT32

n/a

BIGINT, BIGSERIAL

INT64

n/a

REAL

FLOAT32

n/a

DOUBLE PRECISION

FLOAT64

n/a

CHAR[(M)]

STRING

n/a

VARCHAR[(M)]

STRING

n/a

CHARACTER[(M)]

STRING

n/a

CHARACTER VARYING[(M)]

STRING

n/a

TIMESTAMPTZ, TIMESTAMP WITH TIME ZONE

STRING

io.debezium.time.ZonedTimestamp

A string representation of a timestamp with timezone information, where the timezone is GMT

TIMETZ, TIME WITH TIME ZONE

STRING

io.debezium.time.ZonedTime

A string representation of a time value with timezone information, where the timezone is GMT

INTERVAL [P]

FLOAT64

io.debezium.time.MicroDuration

The number of micro seconds for a time interval using the 365.25 / 12.0 formula for days per month average

BYTEA

BYTES

n/a

JSON, JSONB

STRING

io.debezium.data.Json

Contains the string representation of a JSON document, array, or scalar.

XML

STRING

io.debezium.data.Xml

Contains the string representation of an XML document

UUID

STRING

io.debezium.data.Uuid

Contains the string representation of a PostgreSQL UUID value

Other data type mappings are described in the following sections.

Temporal values

Other than PostgreSQL’s TIMESTAMPTZ and TIMETZ data types (which contain time zone information), the other temporal types depend on the value of the time.precision.mode configuration property. When time.precision.mode configuration property is set to adaptive (the default), then the connector will determine the literal type and semantic type for the temporal types based on the column’s data type definition so that events exactly represents the values in the database:

PostgreSQL Data Type Literal type (schema type) Semantic type (schema name) Notes

TIMESTAMP(4) , TIMESTAMP(5), TIMESTAMP(6)

INT64

io.debezium.time.MicroTimestamp

Represents the number of milliseconds past epoch, and does not include timezone information.

DATE

INT32

io.debezium.time.Date

Represents the number of days since epoch.

TIME(1), TIME(2), TIME(3)

INT32

io.debezium.time.Time

Represents the number of milliseconds past midnight, and does not include timezone information.

TIME(4), TIME(5), TIME(6)

INT64

io.debezium.time.MicroTime

Represents the number of microseconds past midnight, and does not include timezone information.

TIMESTAMP(1) , TIMESTAMP(2), TIMESTAMP(3)

INT64

io.debezium.time.Timestamp

Represents the number of milliseconds past epoch, and does not include timezone information.

When time.precision.mode configuration property is set to connect, then the connector will use the predefined Kafka Connect logical types. This may be useful when consumers only know about the built-in Kafka Connect logical types and are unable to handle variable-precision time values. On the other hand, since PostgreSQL supports microsecond precision, the events generated by a connector with the connect time precision mode will result in a loss of precision when the database column has a fractional second precision value greater than 3:

PostgreSQL Data Type Literal type (schema type) Semantic type (schema name) Notes

TIMESTAMP([P])

INT64

org.apache.kafka.connect.data.Timestamp

Represents the number of milliseconds since epoch, and does not include timezone information. PostgreSQL allows P to be in the range 0-6 to store up to microsecond precision, though this mode results in a loss of precision when P > 3.

DATE

INT32

org.apache.kafka.connect.data.Date

Represents the number of days since epoch.

TIME([P])

INT64

org.apache.kafka.connect.data.Time

Represents the number of milliseconds since midnight, and does not include timezone information. PostgreSQL allows P to be in the range 0-6 to store up to microsecond precision, though this mode results in a loss of precision when P > 3.

Decimal values

When decimal.handling.mode configuration property is set to precise, then the connector will use the predefined Kafka Connect org.apache.kafka.connect.data.Decimal logical type for all DECIMAL and NUMERIC columns. This is the default mode.

PostgreSQL Data Type Literal type (schema type) Semantic type (schema name) Notes

DECIMAL[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

The scaled schema parameter contains an integer representing how many digits the decimal point was shifted.

NUMERIC[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

The scaled schema parameter contains an integer representing how many digits the decimal point was shifted.

However, when decimal.handling.mode configuration property is set to double, then the connector will represent all DECIMAL and NUMERIC values as Java double values and encode them as follows:

PostgreSQL Data Type Literal type (schema type) Semantic type (schema name) Notes

DECIMAL[(M[,D])]

FLOAT64

NUMERIC[(M[,D])]

FLOAT64

PostGIS types

The PostgreSQL connector also has limited support for some of the PostGIS data types

PostGIS Data Type Literal type (schema type) Semantic type (schema name) Notes

POINT

STRUCT

io.debezium.data.geometry.Point

Contains a structure with 2 FLOAT64 fields - (x,y) - each representing the coordinates of a geometric point

When things go wrong

Debezium is a distributed system that captures all changes in multiple upstream databases, and will never miss or lose an event. Of course, when the system is operating nominally or being administered carefully, then Debezium provides exactly once delivery of every change event. However, if a fault does happen then the system will still not lose any events, although while it is recovering from the fault it may repeat some change events. Thus, in these abnormal situations Debezium (like Kafka) provides at least once delivery of change events.

The rest of this section describes how Debezium handles various kinds of faults and problems.

Configuration and startup errors

The connector will fail upon startup, report an error/exception in the log, and stop running when the connector’s configuration is invalid, when the connector cannot successfully connect to PostgreSQL using the specified connectivity parameters, or when the connector is restarting from a previously-recorded position in the PostgreSQL WAL (via the LSN value) and PostgreSQL no longer has that history available.

In these cases, the error will have more details about the problem and possibly a suggested work around. The connector can be restarted when the configuration has been corrected or the PostgreSQL problem has been addressed.

PostgreSQL becomes unavailable

Once the connector is running, if the PostgreSQL server it has been connected to becomes unavailable for any reason, the connector will fail with an error and the connector will stop. Simply restart the connector when the server is available.

The PostgreSQL connector stores externally the last processed offset (in the form of a PostgreSQL log sequence number value). Once a connector is restarted and connects to a server instance, if it has a previously stored offset it will ask the server to continue streaming from that particular offset. However, depending on the server configuration, this particular offset may or may not be available in the server’s write-ahead log segments. If it is available, then the connector will simply resume streaming changes without missing anything. If however this information is not available, the connector cannot relay back the changes that occurred while it was not online.

Cluster failures

As of 9.6, PostgreSQL allows logical replication slots only on master servers, which means that a PostgreSQL connector can only be pointed to the active master of a database cluster. If this machine goes down, only after a new master has been promoted (with the logical decoding plugin installed) can the connector be restarted and pointed to the new server.

One potential issue with this is that if there’s a large enough delay between the new server’s promotion and the installation of the plugin together with the restart of the connector, the PostgreSQL server may have removed some WAL information. If this happens, the connector will miss out on all the changes that took place after the election of the new master and before the restart of the connector.

There are discussions in the PostgreSQL community around a feature called failover slots which would help mitigate this problem, but as of 9.6 they have not been implemented yet. You can find out more about this particular issue from this blog post

Kafka Connect process stops gracefully

If Kafka Connect is being run in distributed mode, and a Kafka Connect process is stopped gracefully, then prior to shutdown of that processes Kafka Connect will migrate all of the process' connector tasks to another Kafka Connect process in that group, and the new connector tasks will pick up exactly where the prior tasks left off. There will be a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.

Kafka Connect process crashes

If the Kafka Connector process stops unexpectedly, then any connector tasks it was running will obviously terminate without recording their most recently-processed offsets. When Kafka Connect is being run in distributed mode, it will restart those connector tasks on other processes. However, the PostgreSQL connectors will resume from the last offset recorded by the earlier processes, which means that the new replacement tasks may generate some of the same change events that were processed just prior to the crash. The number of duplicate events will depend on the offset flush period and the volume of data changes just before the crash.

Because there is a chance that some events may be duplicated during a recovery from failure, consumers should always anticipate some events may be duplicated. Debezium change are idempotent, so a sequence of events always results in the same state.

Debezium also includes with each change event message the source-specific information about the origin of the event, including the PostgreSQL server’s time of the event, the id of the server transaction and the position in the write-ahead log where the transaction changes were written. Consumers can keep track of this information (especially the LSN position) to know whether they have already seen a particular event.

Kafka becomes unavailable

As the connector generates change events, the Kafka Connect framework records those events in Kafka using the Kafka producer API. Kafka Connect will also periodically record the latest offset that appears in those change events, at a frequency you’ve specified in the Kakfa Connect worker configuration. If the Kafka brokers become unavailable, the Kafka Connect worker process running the connectors will simply repeatedly attempt to reconnect to the Kafka brokers. In other words, the connector tasks will simply pause until a connection can be re-established, at which point the connectors will resume exactly where they left off.

Connector is stopped for a duration

If the connector is gracefully stopped, the database can continue to be used and any new changes will be recorded in the PostgreSQL WAL. When the connector is restarted, it will resume streaming changes where it last left off, recording change events for all of the changes that were made while the connector was stopped.

A properly configured Kafka cluster is able to massive throughput. Kafka Connect is written with Kafka best practices, and given enough resources will also be able to handle very large numbers of database change events. Because of this, when a connector has been restarted after a while, it is very likely to catch up with the database, though how quickly will depend upon the capabilities and performance of Kafka and the volume of changes being made to the data in PostgreSQL.

If the connector remains stopped for long enough, PostgreSQL might purge older WAL segments and the connector’s last known position may be lost. In this case, when the connector configured with initial snapshot mode (the default) is finally restarted, the PostgreSQL server will no longer have the starting point and the connector will perform an initial snapshot. On the other hand, if the connector’s snapshot mode is disabled, then the connector will fail with an error.

Deploying a connector

If you’ve already installed Zookeeper, Kafka, and Kafka Connect, then using Debezium’s PostgreSQL connector is easy. Simply download the connector’s plugin archive, extract the JARs into your Kafka Connect environment, and add the directory with the JARs to Kafka Connect’s classpath. Restart your Kafka Connect process to pick up the new JARs.

If immutable containers are your thing, then check out Debezium’s Docker images for Zookeeper, Kafka, PostgreSQL and Kafka Connect with the PostgreSQL connector already pre-installed and ready to go. You can even run Debezium on Kubernetes and OpenShift.

To use the connector to produce change events for a particular PostgreSQL server or cluster:

  1. install the logical decoding plugin

  2. configure the PostgreSQL server to support logical replication

  3. create a configuration file for the PostgreSQL Connector and use the Kafka Connect REST API to add that connector to your Kafka Connect cluster.

When the connector starts, it will grab a consistent snapshot of the databases in your PostgreSQL server and start streaming changes, producing events for every inserted, updated, and deleted row. You can also choose to produce events for a subset of the schemas and tables. Optionally ignore, mask, or truncate columns that are sensitive, too large, or not needed.

Example configuration

Using the PostgreSQL connector is straightforward. Here is an example of the configuration for a PostgreSQL connector that monitors a PostgreSQL server at port 5432 on 192.168.99.100, which we logically name fullfillment:

{
  "name": "inventory-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", (2)
    "database.hostname": "192.168.99.100", (3)
    "database.port": "5432", (4)
    "database.user": "postgres", (5)
    "database.password": "postgres", (6)
    "database.dbname" : "postgres", (7)
    "database.server.name": "fullfillment", (8)
    "table.whitelist": "public.inventory" (9)

  }
}
1 The name of our connector when we register it with a Kafka Connect service.
2 The name of this PostgreSQL connector class.
3 The address of the PostgreSQL server.
4 The port number of the PostgreSQL server.
5 The name of the PostgreSQL user that has the required privileges.
6 The password for the PostgreSQL user that has the required privileges.
7 The name of the PostgreSQL database to connect to
8 The logical name of the PostgreSQL server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.
9 A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring.

See the complete list of connector properties that can be specified in these configurations.

This configuration can be sent via POST to a running Kafka Connect service, which will then record the configuration and start up the one connector task that will connect to the PostgreSQL database and record events to Kafka topics.

Connector properties

The following configuration properties are required unless a default value is available.

Property Default Description

database.tcpKeepAlive

Enable TCP keep-alive probe to verify that database connection is still alive. (disabled by default). See the PostgreSQL documentation for more information.

name

Unique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.)

connector.class

The name of the Java class for the connector. Always use a value of io.debezium.connector.postgresql.PostgresConnector for the PostgreSQL connector.

tasks.max

1

The maximum number of tasks that should be created for this connector. The PostgreSQL connector always uses a single task and therefore does not use this value, so the default is always acceptable.

plugin.name

decoderbufs

The name of the Postgres logical decoding plugin installed on the server.

slot.name

debezium

The name of the Postgres logical decoding slot created for streaming changes from a plugin and database instance.

slot.drop_on_stop

false

Whether or not to drop the logical replication slot when the connector finishes orderly.

database.hostname

IP address or hostname of the PostgreSQL database server.

database.port

5432

Integer port number of the PostgreSQL database server.

database.user

Name of the PostgreSQL database to use when when connecting to the PostgreSQL database server.

database.password

Password to use when when connecting to the PostgreSQL database server.

database.dbname

The name of the PostgreSQL database from which to stream the changes

database.server.name

host:port/dbname

Logical name that identifies and provides a namespace for the particular PostgreSQL database server/cluster being monitored. The logical name should be unique across all other connectors, since it is used as a prefix for all Kafka topic names coming from this connector. Defaults to 'host:_port_/dbname', where host is the value of the database.hostname property and port is the value of the database.port property and dbname is the value of the database.dbname property, though we recommend using an explicit and meaningful logical name.

schema.whitelist

An optional comma-separated list of regular expressions that match schema names to be monitored; any schema name not included in the whitelist will be excluded from monitoring. By default all non-system schemas will be monitored. May not be used with database.blacklist.

schema.blacklist

An optional comma-separated list of regular expressions that match schema names to be excluded from monitoring; any schema name not included in the blacklist will be monitored, with the exception of system schemas. May not be used with schema.whitelist.

table.whitelist

An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored; any table not included in the whitelist will be excluded from monitoring. Each identifier is of the form schemaName.tableName. By default the connector will monitor every non-system table in each monitored schema. May not be used with table.blacklist.

table.blacklist

An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring; any table not included in the blacklist will be monitored. Each identifier is of the form schemaName.tableName. May not be used with table.whitelist.

column.blacklist

An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName

time.precision.mode

adaptive

Time, date, and timestamps can be represented with different kinds of precisions, including: adaptive (the default) captures the time and timestamp values exactly as in the database uses either millisecond, microsecond, or nanosecond precision values based on the database column’s type; or connect always represents time and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns' precision. See temporal values.

database.sslmode

disabled

Whether to use an encrypted connection to the PostgreSQL server. Options include: disabled (the default) to use an unencrypted connection ; required to use a secure (encrypted) connection, and fail if one cannot be established; verify_ca like required but additionally verify the server TLS certificate against the configured Certificate Authority (CA) certificates, or fail if no valid matching CA certificates are found; verify_full like verify_ca but additionally verify that the server certificate matches the host to which the connection is attempted. See the PostgreSQL documentation for more information.

database.sslcert

The path to the file containing the SSL Certificate for the client. See the PostgreSQL documentation for more information.

database.sslkey

The path to the file containing the SSL private key of the client. See the PostgreSQL documentation for more information.

database.sslpassword

The password to access the client private key from the file specified by database.sslkey. See the PostgreSQL documentation for more information.

database.sslrootcert

The path to the file containing the root certificate(s) against which the server is validated. See the PostgreSQL documentation for more information.

The following advanced configuration properties have good defaults that will work in most situations and therefore rarely need to be specified in the connector’s configuration.

Property Default Description

poll.interval.ms

1000

Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 1000 milliseconds, or 1 second.

snapshot.mode

initial

Specifies the criteria for running a snapshot upon startup of the connector. The default is initial, and specifies the connector can run a snapshot only when no offsets have been recorded for the logical server name. The always option specifies that the connector run a snapshot each time on startup. The never option specifies that the connect should never use snapshots and that upon first startup with a logical server name the connector should read from either from where it last left off (last LSN position) or start from the beginning from the point of the view of the logical replication slot. Finally, the initial_only option specifies that the connector should only take an initial snapshot and then stop, without processing any subsequent changes. See snapshosts

snapshot.lock.timeout.ms

10000

Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be aquired in this time interval, the snapshot will fail See snapshosts

rows.fetch.size

10240

Positive integer value that specifies the maximum number of rows that should be read from each table while taking a snapshot

max.queue.size

20240

Positive integer value that specifies the maximum size of the blocking queue into which change events received via streaming replication are placed before they are written to Kafka. This queue can provide backpressure when, for example, writes to Kafka are slower or if Kafka is not available.

max.batch.size

10240

Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector.

The connector also supports pass-through configuration properties that are used when creating the Kafka producer and consumer.

Be sure to consult the Kafka documentation for all of the configuration properties for Kafka producers and consumers. (The PostgreSQL connector does use the new consumer.)

back to top