Debezium connector for JDBC
Overview
The Debezium JDBC connector is a Kafka Connect sink connector implementation that can consume events from multiple source topics, and then write those events to a relational database by using a JDBC driver. This connector supports a wide variety of database dialects, including Db2, MySQL, Oracle, PostgreSQL, and SQL Server.
How the JDBC connector works
The Debezium JDBC connector is a Kafka Connect sink connector, and therefore requires the Kafka Connect runtime. The connector periodically polls the Kafka topics that it subscribes to, consumes events from those topics, and then writes the events to the configured relational database. The connector supports idempotent write operations by using upsert semantics and basic schema evolution.
The Debezium JDBC connector provides the following features:
Consuming complex Debezium change events
By default, Debezium source connectors produce complex, hierarchical change events.
When Debezium connectors are used with other JDBC sink connector implementations, you might need to apply the ExtractNewRecordState
single message transformation (SMT) to flatten the payload of change events, so that they can be consumed by the sink implementation.
If you run the Debezium JDBC sink connector, it’s not necessary to deploy the SMT, because the Debezium sink connector can consume native Debezium change events directly, without the use of a transformation.
When the JDBC sink connector consumes a complex change event from a Debezium source connector, it extracts the values from the after
section of the original insert
or update
event.
When a delete event is consumed by the sink connector, no part of the event’s payload is consulted.
The Debezium JDBC sink connector is not designed to read from schema change topics.
If your source connector is configured to capture schema changes, in the JDBC connector configuration, set the |
At-least-once delivery
The Debezium JDBC sink connector guarantees that events that is consumes from Kafka topics are processed at least once.
Multiple tasks
You can run the Debezium JDBC sink connector across multiple Kafka Connect tasks.
To run the connector across multiple tasks, set the tasks.max
configuration property to the number of tasks that you want the connector to use.
The Kafka Connect runtime starts the specified number of tasks, and runs one instance of the connector per task.
Multiple tasks can improve performance by reading and processing changes from multiple source topics in parallel.
Data and column type mappings
To enable the Debezium JDBC sink connector to correctly map the data type from an inbound message field to an outbound message field, the connector requires information about the data type of each field that is present in the source event.
The connector supports a wide range of column type mappings across different database dialects.
To correctly convert the destination column type from the type
metadata in an event field, the connector applies the data type mappings that are defined for the source database.
You can enhance the way that the connector resolves data types for a column by setting the column.propagate.source.type
or datatype.propagate.source.type
options in the source connector configuration.
When you enable these options, Debezium includes extra parameter metadata, which assists the JDBC sink connector in more accurately resolving the data type of destination columns.
For the Debezium JDBC sink connector to process events from a Kafka topic, the Kafka topic message key, when present, must be a primitive data type or a Struct
.
In addition, the payload of the source message must be a Struct
that has either a flattened structure with no nested struct
types, or a nested struct
layout that conforms to Debezium’s complex, hierarchical structure.
If the structure of the events in the Kafka topic do not adhere to these rules, you must implement a custom single message transformation to convert the structure of the source events into a usable format.
Primary key handling
By default, the Debezium JDBC sink connector does not transform any of the fields in the source event into the primary key for the event. Unfortunately, the lack of a stable primary key can complicate event processing, depending on your business requirements, or when the sink connector uses upsert semantics. To define a consistent primary key, you can configure the connector to use one of the primary key modes described in the following table:
Mode | Description |
---|---|
|
No primary key fields are specified when creating the table. |
|
The primary key consists of the following three columns:
The values for these columns are sourced from the coordinates of the Kafka event. |
|
The primary key is composed of the Kafka event’s key. |
|
The primary key is composed of the Kafka event’s value. |
|
The primary key is composed of the Kafka event’s headers. |
Some database dialects might throw an exception if you set the
|
If a column maps to a data type that isn’t permitted as a primary key for your target database, an explicit list of columns will be necessary in |
Delete mode
The Debezium JDBC sink connector can delete rows in the destination database when a DELETE
or tombstone event is consumed.
By default, the JDBC sink connector does not enable delete mode.
If you want to the connector to remove rows, you must explicitly set delete.enabled=true
in the connector configuration.
To use this mode you must also set primary.key.fields
to a value other than none
.
The preceding configuration is necessary, because deletes are executed based on the primary key mapping, so if a destination table has no primary key mapping, the connector is unable to delete rows.
Idempotent writes
The Debezium JDBC sink connector can perform idempotent writes, enabling it to replay the same records repeatedly and not change the final database state.
To enable the connector to perform idempotent writes, you must be explicitly set the insert.mode
for the connector to upsert
.
An upsert
operation is applied as either an update
or an insert
, depending on whether the specified primary key already exists.
If the primary key value already exists, the operation updates values in the row.
If the specified primary key value doesn’t exist, an insert
adds a new row.
Each database dialect handles idempotent writes differently, because there is no SQL standard for upsert operations.
The following table shows the upsert
DML syntax for the database dialects that Debezium supports:
Dialect | Upsert Syntax |
---|---|
Db2 |
|
MySQL |
|
Oracle |
|
PostgreSQL |
|
SQL Server |
|
Schema evolution
You can use the following schema evolution modes with the Debezium JDBC sink connector:
Mode | Description |
---|---|
|
The connector does not perform any DDL schema evolution. |
|
The connector automatically detects fields that are in the event payload but that do not exist in the destination table. The connector alters the destination table to add the new fields. |
When schema.evolution
is set to basic
, the connector automatically creates or alters the destination database table according to the structure of the incoming event.
When an event is received from a topic for the first time, and the destination table does not yet exist, the Debezium JDBC sink connector uses the event’s key, or the schema structure of the record to resolve the column structure of the table.
If schema evolution is enabled, the connector prepares and executes a CREATE TABLE
SQL statement before it applies the DML event to the destination table.
When the Debezium JDBC connector receives an event from a topic, if the schema structure of the record differs from the schema structure of the destination table, the connector uses either the event’s key or its schema structure to identify which columns are new, and must be added to the database table.
If schema evolution is enabled, the connector prepares and executes an ALTER TABLE
SQL statement before it applies the DML event to the destination table.
Because changing column data types, dropping columns, and adjusting primary keys can be considered dangerous operations, the connector is prohibited from performing these operations.
The schema of each field determines whether a column is NULL
or NOT NULL
.
The schema also defines the default values for each column.
If the connector attempts to create a table with a nullability setting or a default value that don’t want, you must either create the table manually, ahead of time, or adjust the schema of the associated field before the sink connector processes the event.
To adjust nullability settings or default values, you can introduce a custom single message transformation that applies changes in the pipeline, or modifies the column state defined in the source database.
A field’s data type is resolved based on a predefined set of mappings. For more information, see Data type mappings.
When you introduce new fields to the event structure of tables that already exist in the destination database, you must define the new fields as optional, or the fields must have a default value specified in the database schema. If you want a field to be removed from the destination table, use one of the following options:
|
Quoting and case sensitivity
The Debezium JDBC sink connector consumes Kafka messages by constructing either DDL (schema changes) or DML (data changes) SQL statements that are executed on the destination database. By default, the connector uses the names of the source topic and the event fields as the basis for the table and column names in the destination table. The constructed SQL does not automatically delimit identifiers with quotes to preserve the case of the original strings. As a result, by default, the text case of table or column names in the destination database depends entirely on how the database handles name strings when the case is not specified.
For example, if the destination database dialect is Oracle and the event’s topic is orders
, the destination table will be created as ORDERS
because Oracle defaults to upper-case names when the name is not quoted.
Similarly, if the destination database dialect is PostgreSQL and the event’s topic is ORDERS
, the destination table will be created as orders
because PostgreSQL defaults to lower-case names when the name is not quoted.
To explicitly preserve the case of the table and field names that are present in a Kafka event, in the connector configuration, set the value of the quote.identifiers
property to true
.
When this options is set, when an incoming event is for a topic called orders
, and the destination database dialect is Oracle, the connector creates a table with the name orders
, because the constructed SQL defines the name of the table as "orders"
.
Enabling quoting results in the same behavior when the connector creates column names.
Connection Idle Timeouts
The JDBC sink connector for Debezium leverages a connection pool to enhance performance. Connection pools are engineered to establish an initial set of connections, maintain a specified number of connections, and efficiently allocate connections to the application as required. However, a challenge arises when connections linger idle in the pool, potentially triggering timeouts if they remain inactive beyond the configured idle timeout threshold of the database.
To mitigate the potential for idle connection threads to trigger timeouts, connection pools offer a mechanism that periodically validates the activity of each connection. This validation ensures that connections remain active, and prevents the database from flagging them as idle. In the event of a network disruption, if Debezium attempts to use a terminated connection, the connector prompts the pool to generate a new connection.
By default, the Debezium JDBC sink connector does not conduct idle timeout tests.
However, you can configure the connector to request the pool to perform timeout tests at a specified interval by setting the hibernate.c3p0.idle_test_period
property.
For example:
{
"hibernate.c3p0.idle_test_period": "300"
}
The Debezium JDBC sink connector uses the Hibernate C3P0 connection pool. You can customize the CP30 connection pool by setting properties in the hibernate.c3p0.*` configuration namespace. In the preceding example, the setting of the hibernate.c3p0.idle_test_period property configures the connection pool to perform idle timeout tests every 300 seconds. After you apply the configuration, the connection pool begins to assess unused connections every five minutes.
Data type mappings
The Debezium JDBC sink connector resolves a column’s data type by using a logical or primitive type-mapping system.
Primitive types include values such as integers, floating points, Booleans, strings, and bytes.
Typically, these types are represented with a specific Kafka Connect Schema
type code only.
Logical data types are more often complex types, including values such as Struct
-based types that have a fixed set of field names and schema, or values that are represented with a specific encoding, such as number of days since epoch.
The following examples show representative structures of primitive and logical data types:
{
"schema": {
"type": "INT64"
}
}
{
"schema": {
"type": "INT64",
"name": "org.apache.kafka.connect.data.Date"
}
}
Kafka Connect is not the only source for these complex, logical types. In fact, Debezium source connectors generate change events that have fields with similar logical types to represent a variety of different data types, including but not limited to, timestamps, dates, and even JSON data.
The Debezium JDBC sink connector uses these primitive and logical types to resolve a column’s type to a JDBC SQL code, which represents a column’s type. These JDBC SQL codes are then used by the underlying Hibernate persistence framework to resolve the column’s type to a logical data type for the dialect in use. The following tables illustrate the primitive and logical mappings between Kafka Connect and JDBC SQL types, and between Debezium and JDBC SQL types. The actual final column type varies with for each database type.
Primitive Type | JDBC SQL Type |
---|---|
INT8 |
Types.TINYINT |
INT16 |
Types.SMALLINT |
INT32 |
Types.INTEGER |
INT64 |
Types.BIGINT |
FLOAT32 |
Types.FLOAT |
FLOAT64 |
Types.DOUBLE |
BOOLEAN |
Types.BOOLEAN |
STRING |
Types.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR |
BYTES |
Types.VARBINARY |
Logical Type | JDBC SQL Type |
---|---|
org.apache.kafka.connect.data.Decimal |
Types.DECIMAL |
org.apache.kafka.connect.data.Date |
Types.DATE |
org.apache.kafka.connect.data.Time |
Types.TIMESTAMP |
org.apache.kafka.connect.data.Timestamp |
Types.TIMESTAMP |
Logical Type | JDBC SQL Type |
---|---|
io.debezium.time.Date |
Types.DATE |
io.debezium.time.Time |
Types.TIMESTAMP |
io.debezium.time.MicroTime |
Types.TIMESTAMP |
io.debezium.time.NanoTime |
Types.TIMESTAMP |
io.debezium.time.ZonedTime |
Types.TIME_WITH_TIMEZONE |
io.debezium.time.Timestamp |
Types.TIMESTAMP |
io.debezium.time.MicroTimestamp |
Types.TIMESTAMP |
io.debezium.time.NanoTimestamp |
Types.TIMESTAMP |
io.debezium.time.ZonedTimestamp |
Types.TIMESTAMP_WITH_TIMEZONE |
io.debezium.data.VariableScaleDecimal |
Types.DOUBLE |
If the database does not support time or timestamps with time zones, the mapping resolves to its equivalent without timezones. |
Logical Type | MySQL SQL Type | PostgreSQL SQL Type | SQL Server SQL Type |
---|---|---|---|
io.debezium.data.Bits |
|
|
|
io.debezium.data.Enum |
|
Types.VARCHAR |
n/a |
io.debezium.data.Json |
|
|
n/a |
io.debezium.data.EnumSet |
|
n/a |
n/a |
io.debezium.time.Year |
|
n/a |
n/a |
io.debezium.time.MicroDuration |
n/a |
|
n/a |
io.debezium.data.Ltree |
n/a |
|
n/a |
io.debezium.data.Uuid |
n/a |
|
n/a |
io.debezium.data.Xml |
n/a |
|
|
Logical Type | Db2 | MySQL | PostgreSQL | Oracle | SQL Server |
---|---|---|---|---|---|
io.debezium.data.DoubleVector |
Unsupported |
'vector' |
|
Unsupported |
Unsupported |
io.debezium.data.FloatVector |
Unsupported |
'vector' |
|
Unsupported |
Unsupported |
io.debezium.data.SparseVector |
Unsupported |
Unsupported |
|
Unsupported |
Unsupported |
If there is not a direct mapping between one of the vector logical types and your target relational database’s column types, you can use the |
In addition to the primitive and logical mappings above, if the source of the change events is a Debezium source connector, the resolution of the column type, along with its length, precision, and scale, can be further influenced by enabling column or data type propagation. To enforce propagation, one of the following properties must be set in the source connector configuration:
-
column.propagate.source.type
-
datatype.propagate.source.type
The Debezium JDBC sink connector applies the values with the higher precedence.
For example, let’s say the following field schema is included in a change event:
{
"schema": {
"type": "INT8",
"parameters": {
"__debezium.source.column.type": "TINYINT",
"__debezium.source.column.length": "1"
}
}
}
In the preceding example, if no schema parameters are set, the Debezium JDBC sink connector maps this field to a column type of Types.SMALLINT
.
Types.SMALLINT
can have different logical database types, depending on the database dialect.
For MySQL, the column type in the example converts to a TINYINT
column type with no specified length.
If column or data type propagation is enabled for the source connector, the Debezium JDBC sink connector uses the mapping information to refine the data type mapping process and create a column with the type TINYINT(1)
.
Typically, the effect of using column or data type propagation is much greater when the same type of database is used for both the source and sink database. We are continually looking at ways to improve this mapping across heterogeneous databases and the current type system allows us to continue to refine these mappings based on feedback. If you find a mapping could be improved, please let us know. |
Deployment
To deploy a Debezium JDBC connector, you install the Debezium JDBC connector archive, configure the connector, and start the connector by adding its configuration to Kafka Connect.
-
Apache ZooKeeper, Apache Kafka, and Kafka Connect are installed.
-
A destination database is installed and configured to accept JDBC connections.
-
Download the Debezium JDBC connector plug-in archive.
-
Extract the files into your Kafka Connect environment.
-
Optionally download the JDBC driver from Maven Central and extract the downloaded driver file to the directory that contains the JDBC sink connector JAR file.
Drivers for Oracle and Db2 are not included with the JDBC sink connector. You must download the drivers and install them manually.
-
Add the driver JAR files to the path where the JDBC sink connector has been installed.
-
Make sure that the path where you install the JDBC sink connector is part of the Kafka Connect
plugin.path
. -
Restart the Kafka Connect process to pick up the new JAR files.
Debezium JDBC connector configuration
Typically, you register a Debezium JDBC connector by submitting a JSON request that specifies the configuration properties for the connector.
The following example shows a JSON request for registering an instance of the Debezium JDBC sink connector that consumes events from a topic called orders
with the most common configuration settings:
{
"name": "jdbc-connector", (1)
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", (2)
"tasks.max": "1", (3)
"connection.url": "jdbc:postgresql://localhost/db", (4)
"connection.username": "pguser", (5)
"connection.password": "pgpassword", (6)
"insert.mode": "upsert", (7)
"delete.enabled": "true", (8)
"primary.key.mode": "record_key", (9)
"schema.evolution": "basic", (10)
"use.time.zone": "UTC", (11)
"topics": "orders" (12)
}
}
Descriptions of JDBC connector configuration settings
Item | Description |
---|---|
1 |
The name that is assigned to the connector when you register it with Kafka Connect service. |
2 |
The name of the JDBC sink connector class. |
3 |
The maximum number of tasks to create for this connector. |
4 |
The JDBC URL that the connector uses to connect to the sink database that it writes to. |
5 |
The name of the database user that is used for authentication. |
6 |
The password of the database user used for authentication. |
7 |
The insert.mode that the connector uses. |
8 |
Enables the deletion of records in the database. For more information, see the delete.enabled configuration property. |
9 |
Specifies the method used to resolve primary key columns. For more information, see the primary.key.mode configuration property. |
10 |
Enables the connector to evolve the destination database’s schema. For more information, see the schema.evolution configuration property. |
11 |
Specifies the timezone used when writing temporal field types. |
12 |
List of topics to consume, separated by commas. |
For a complete list of configuration properties that you can set for the Debezium JDBC connector, see JDBC connector properties.
You can send this configuration with a POST
command to a running Kafka Connect service.
The service records the configuration and starts a sink connector task(s) that performs the following operations:
-
Connects to the database.
-
Consumes events from subscribed Kafka topics.
-
Writes the events to the configured database.
Connector properties
The Debezium JDBC sink connector has several configuration properties that you can use to achieve the connector behavior that meets your needs. Many properties have default values. Information about the properties is organized as follows:
JDBC connector generic properties
Property | Default | Description |
---|---|---|
No default |
Unique name for the connector. A failure results if you attempt to reuse this name when registering a connector. This property is required by all Kafka Connect connectors. |
|
No default |
The name of the Java class for the connector.
For the Debezium JDBC connector, specify the value |
|
1 |
Maximum number of tasks to use for this connector. |
|
No default |
List of topics to consume, separated by commas.
Do not use this property in combination with the |
|
No default |
A regular expression that specifies the topics to consume.
Internally, the regular expression is compiled to a |
JDBC connector connection properties
Property | Default | Description |
---|---|---|
|
The connection provider implementation to use. |
|
No default |
The JDBC connection URL used to connect to the database. |
|
No default |
The name of the database user account that the connector uses to connect to the database. |
|
No default |
The password that the connector uses to connect to the database. |
|
|
Specifies the minimum number of connections in the pool. |
|
|
Specifies the maximum number of concurrent connections that the pool maintains. |
|
|
Specifies the number of connections that the connector attempts to acquire if the connection pool exceeds its maximum size. |
|
|
Specifies the number of seconds that an unused connection is kept before it is discarded. |
JDBC connector runtime properties
Property | Default | Description | ||
---|---|---|---|---|
|
Specifies the timezone used when inserting JDBC temporal values. |
|||
|
Specifies whether the connector processes |
|||
|
Specifies whether the connector processes
|
|||
|
Specifies the strategy used to insert events into the database. The following options are available:
|
|||
|
Specifies how the connector resolves the primary key columns from the event.
|
|||
No default |
Either the name of the primary key column or a comma-separated list of fields to derive the primary key from. |
|||
|
Specifies whether generated SQL statements use quotation marks to delimit table and column names. See the Quoting and case sensitivity section for more details. |
|||
|
Specifies how the connector evolves the destination table schemas. For more information, see Schema evolution. The following options are available:
|
|||
|
Specifies a string pattern that the connector uses to construct the names of destination tables. |
|||
|
Specifies the schema name where the PostgreSQL PostGIS extension is installed.
The default is |
|||
|
Specifies whether the connector automatically sets an |
|||
|
Specifies how many records to attempt to batch together into the destination table.
|
|||
|
Specifies whether to enable the Debezium JDBC connector’s reduction buffer. Choose one of the following settings:
To optimize query processing in a PostgreSQL sink database when the reduction buffer is enabled, you must also enable the database to execute the batched queries by adding the |
|||
empty string |
An optional, comma-separated list of field names that match the fully-qualified names of fields to include from the change event value.
Fully-qualified names for fields are of the form |
|||
empty string |
An optional, comma-separated list of field names that match the fully-qualified names of fields to exclude from the change event value.
Fully-qualified names for fields are of the form |
|||
5 |
Specifies the maximum number of retries that the connector performs after an attempt to flush changes to the target database results in certain database errors. If the number of retries exceeds the retry value, the sink connector enters a FAILED state. |
|||
1000 |
Specifies the number of milliseconds that the connector waits to retry a flush operation that failed.
|
JDBC connector extendable properties
Property | Default | Description |
---|---|---|
|
Specifies the fully-qualified class name of a |
|
|
Specifies the fully-qualified class name of a
|
JDBC connector hibernate.*
passthrough properties
Kafka Connect supports passthrough configuration, enabling you to modify the behavior of an underlying system by passing certain properties directly from the connector configuration.
By default, some Hibernate properties are exposed via the JDBC connector connection properties (for example, connection.url
, connection.username
, and connection.pool.*_size
),
and through the connector’s runtime properties (for example, use.time.zone
, quote.identifiers
).
If you want to customize other Hibernate behavior, you can take advantage of the passthrough mechanism by adding properties that use the hibernate.*
namespace to the connector configuration.
For example, to assist Hibernate in resolving the type and version of the target database, you can add the hibernate.dialect
property and set it to the fully qualified class name of the database, for example, org.hibernate.dialect.MariaDBDialect
.
Frequently asked questions
- Is the
ExtractNewRecordState
single message transformation required? -
No, that is actually one of the differentiating factors of the Debezium JDBC connector from other implementations. While the connector is capable of ingesting flattened events like its competitors, it can also ingest Debezium’s complex change event structure natively, without requiring any specific type of transformation.
- If a column’s type is changed, or if a column is renamed or dropped, is this handled by schema evolution?
-
No, the Debezium JDBC connector does not make any changes to existing columns. The schema evolution supported by the connector is quite basic. It simply compares the fields in the event structure to the table’s column list, and then adds any fields that are not yet defined as columns in the table. If a column’s type or default value change, the connector does not adjust them in the destination database. If a column is renamed, the old column is left as-is, and the connector appends a column with the new name to the table; however existing rows with data in the old column remain unchanged. These types of schema changes should be handled manually.
- If a column’s type does not resolve to the type that I want, how can I enforce mapping to a different data type?
-
The Debezium JDBC connector uses a sophisticated type system to resolve a column’s data type. For details about how this type system resolves a specific field’s schema definition to a JDBC type, see the Data and column type mappings section. If you want to apply a different data type mapping, define the table manually to explicitly obtain the preferred column type.
- How do you specify a prefix or a suffix to the table name without changing the Kafka topic name?
-
In order to add a prefix or a suffix to the destination table name, adjust the collection.name.format connector configuration property to apply the prefix or suffix that you want. For example, to prefix all table names with
jdbc_
, specify thecollection.name.format
configuration property with a value ofjdbc_${topic}
. If the connector is subscribed to a topic calledorders
, the resulting table is created asjdbc_orders
. - Why are some columns automatically quoted, even though identifier quoting is not enabled?
-
In some situations, specific column or table names might be explicitly quoted, even when
quote.identifiers
is not enabled. This is often necessary when the column or table name starts with or uses a specific convention that would otherwise be considered illegal syntax. For example, when the primary.key.mode is set tokafka
, some databases only permit column names to begin with an underscore if the column’s name is quoted. Quoting behavior is dialect-specific, and varies among different types of database.