You are viewing documentation for an outdated version of Debezium.
If you want to view the latest stable version of this page, please go here.

Debezium connector for JDBC

This connector is currently in an incubating state. In future versions, we might change the exact semantics, configuration options, and so forth, depending on the feedback that we receive. Please let us know if you encounter any problems.

Overview

The Debezium JDBC connector is a Kafka Connect sink connector implementation that can consume events from multiple source topics, and then write those events to a relational database by using a JDBC driver. This connector supports a wide variety of database dialects, including Db2, MySQL, Oracle, PostgreSQL, and SQL Server.

How the JDBC connector works

The Debezium JDBC connector is a Kafka Connect sink connector, and therefore requires the Kafka Connect runtime. The connector periodically polls the Kafka topics that it subscribes to, consumes events from those topics, and then writes the events to the configured relational database. The connector supports idempotent write operations by using upsert semantics and basic schema evolution.

The Debezium JDBC connector provides the following features:

Consuming complex Debezium change events

By default, Debezium source connectors produce complex, hierarchical change events. When Debezium connectors are used with other JDBC sink connector implementations, you might need to apply the ExtractNewRecordState single message transformation (SMT) to flatten the payload of change events, so that they can be consumed by the sink implementation. If you run the Debezium JDBC sink connector, it’s not necessary to deploy the SMT, because the Debezium sink connector can consume native Debezium change events directly, without the use of a transformation.

When the JDBC sink connector consumes a complex change event from a Debezium source connector, it extracts the values from the after section of the original insert or update event. When a delete event is consumed by the sink connector, no part of the event’s payload is consulted.

The Debezium JDBC sink connector is not designed to read from schema change topics. If your source connector is configured to capture schema changes, in the JDBC connector configuration, set the topics or topics.regex properties so that the connector does not consume from schema change topics.

At-least-once delivery

The Debezium JDBC sink connector guarantees that events that is consumes from Kafka topics are processed at least once.

Multiple tasks

You can run the Debezium JDBC sink connector across multiple Kafka Connect tasks. To run the connector across multiple tasks, set the tasks.max configuration property to the number of tasks that you want the connector to use. The Kafka Connect runtime starts the specified number of tasks, and runs one instance of the connector per task. Multiple tasks can improve performance by reading and processing changes from multiple source topics in parallel.

Data and column type mappings

To enable the Debezium JDBC sink connector to correctly map the data type from an inbound message field to an outbound message field, the connector requires information about the data type of each field that is present in the source event. The connector supports a wide range of column type mappings across different database dialects. To correctly convert the destination column type from the type metadata in an event field, the connector applies the data type mappings that are defined for the source database. You can enhance the way that the connector resolves data types for a column by setting the column.propagate.source.type or datatype.propagate.source.type options in the source connector configuration. When you enable these options, Debezium includes extra parameter metadata, which assists the JDBC sink connector in more accurately resolving the data type of destination columns.

For the Debezium JDBC sink connector to process events from a Kafka topic, the Kafka topic message key, when present, must be a primitive data type or a Struct. In addition, the payload of the source message must be a Struct that has either a flattened structure with no nested struct types, or a nested struct layout that conforms to Debezium’s complex, hierarchical structure.

If the structure of the events in the Kafka topic do not adhere to these rules, you must implement a custom single message transformation to convert the structure of the source events into a usable format.

Primary key handling

By default, the Debezium JDBC sink connector does not transform any of the fields in the source event into the primary key for the event. Unfortunately, the lack of a stable primary key can complicate event processing, depending on your business requirements, or when the sink connector uses upsert semantics. To define a consistent primary key, you can configure the connector to use one of the primary key modes described in the following table:

Mode Description

none

No primary key fields are specified when creating the table.

kafka

The primary key consists of the following three columns:

  • __connect_topic

  • __connect_partition

  • __connect_offset

The values for these columns are sourced from the coordinates of the Kafka event.

record_key

The primary key is composed of the Kafka event’s key.

If the primary key is a primitive type, specify the name of the column to be used by setting the primary.key.fields property. If the primary key is a struct type, the fields in the struct are mapped as columns of the primary key. You can use the primary.key.fields property to restrict the primary key to a subset of columns.

record_value

The primary key is composed of the Kafka event’s value.

Because the value of a Kafka event is always a Struct, by default, all of the fields in the value become columns of the primary key. To use a subset of fields in the primary key, set the primary.key.fields property to specify a comma-separated list of fields in the value from which you want to derive the primary key columns.

record_header

The primary key is composed of the Kafka event’s headers.

Kafka event’s headers contains could contain multiple header that each one could be Struct or primitives data types, the connectors makes a Struct of these headers. Hence, all fields in this Struct become columns of the primary key. To use a subset of fields in the primary key, set the primary.key.fields property to specify a comma-separated list of fields in the value from which you want to derive the primary key columns.

Some database dialects might throw an exception if you set the primary.key.mode to kafka and set schema.evolution to basic. This exception occurs when a dialect maps a STRING data type mapping to a variable length string data type such as TEXT or CLOB, and the dialect does not allow primary key columns to have unbounded lengths. To avoid this problem, apply the following settings in your environment:

  • Do not set schema.evolution to basic.

  • Create the database table and primary key mappings in advance.

Delete mode

The Debezium JDBC sink connector can delete rows in the destination database when a DELETE or tombstone event is consumed. By default, the JDBC sink connector does not enable delete mode.

If you want to the connector to remove rows, you must explicitly set delete.enabled=true in the connector configuration. To use this mode you must also set primary.key.fields to a value other than none. The preceding configuration is necessary, because deletes are executed based on the primary key mapping, so if a destination table has no primary key mapping, the connector is unable to delete rows.

Idempotent writes

The Debezium JDBC sink connector can perform idempotent writes, enabling it to replay the same records repeatedly and not change the final database state.

To enable the connector to perform idempotent writes, you must be explicitly set the insert.mode for the connector to upsert. An upsert operation is applied as either an update or an insert, depending on whether the specified primary key already exists.

If the primary key value already exists, the operation updates values in the row. If the specified primary key value doesn’t exist, an insert adds a new row.

Each database dialect handles idempotent writes differently, because there is no SQL standard for upsert operations. The following table shows the upsert DML syntax for the database dialects that Debezium supports:

Dialect Upsert Syntax

Db2

MERGE …​

MySQL

INSERT …​ ON DUPLICATE KEY UPDATE …​

Oracle

MERGE …​

PostgreSQL

INSERT …​ ON CONFLICT …​ DO UPDATE SET …​

SQL Server

MERGE …​

Schema evolution

You can use the following schema evolution modes with the Debezium JDBC sink connector:

Mode Description

none

The connector does not perform any DDL schema evolution.

basic

The connector automatically detects fields that are in the event payload but that do not exist in the destination table. The connector alters the destination table to add the new fields.

When schema.evolution is set to basic, the connector automatically creates or alters the destination database table according to the structure of the incoming event.

When an event is received from a topic for the first time, and the destination table does not yet exist, the Debezium JDBC sink connector uses the event’s key, or the schema structure of the record to resolve the column structure of the table. If schema evolution is enabled, the connector prepares and executes a CREATE TABLE SQL statement before it applies the DML event to the destination table.

When the Debezium JDBC connector receives an event from a topic, if the schema structure of the record differs from the schema structure of the destination table, the connector uses either the event’s key or its schema structure to identify which columns are new, and must be added to the database table. If schema evolution is enabled, the connector prepares and executes an ALTER TABLE SQL statement before it applies the DML event to the destination table. Because changing column data types, dropping columns, and adjusting primary keys can be considered dangerous operations, the connector is prohibited from performing these operations.

The schema of each field determines whether a column is NULL or NOT NULL. The schema also defines the default values for each column. If the connector attempts to create a table with a nullability setting or a default value that don’t want, you must either create the table manually, ahead of time, or adjust the schema of the associated field before the sink connector processes the event. To adjust nullability settings or default values, you can introduce a custom single message transformation that applies changes in the pipeline, or modifies the column state defined in the source database.

A field’s data type is resolved based on a predefined set of mappings. For more information, see Data type mappings.

When you introduce new fields to the event structure of tables that already exist in the destination database, you must define the new fields as optional, or the fields must have a default value specified in the database schema. If you want a field to be removed from the destination table, use one of the following options:

  • Remove the field manually.

  • Drop the column.

  • Assign a default value to the field.

  • Define the field a nullable.

Quoting and case sensitivity

The Debezium JDBC sink connector consumes Kafka messages by constructing either DDL (schema changes) or DML (data changes) SQL statements that are executed on the destination database. By default, the connector uses the names of the source topic and the event fields as the basis for the table and column names in the destination table. The constructed SQL does not automatically delimit identifiers with quotes to preserve the case of the original strings. As a result, by default, the text case of table or column names in the destination database depends entirely on how the database handles name strings when the case is not specified.

For example, if the destination database dialect is Oracle and the event’s topic is orders, the destination table will be created as ORDERS because Oracle defaults to upper-case names when the name is not quoted. Similarly, if the destination database dialect is PostgreSQL and the event’s topic is ORDERS, the destination table will be created as orders because PostgreSQL defaults to lower-case names when the name is not quoted.

To explicitly preserve the case of the table and field names that are present in a Kafka event, in the connector configuration, set the value of the quote.identifiers property to true. When this options is set, when an incoming event is for a topic called orders, and the destination database dialect is Oracle, the connector creates a table with the name orders, because the constructed SQL defines the name of the table as "orders". Enabling quoting results in the same behavior when the connector creates column names.

Data type mappings

The Debezium JDBC sink connector resolves a column’s data type by using a logical or primitive type-mapping system. Primitive types include values such as integers, floating points, Booleans, strings, and bytes. Typically, these types are represented with a specific Kafka Connect Schema type code only. Logical data types are more often complex types, including values such as Struct-based types that have a fixed set of field names and schema, or values that are represented with a specific encoding, such as number of days since epoch.

The following examples show representative structures of primitive and logical data types:

Primitive field schema
{
  "schema": {
    "type": "INT64"
  }
}
Logical field schema
[
  "schema": {
    "type": "INT64",
    "name": "org.apache.kafka.connect.data.Date"
  }
]

Kafka Connect is not the only source for these complex, logical types. In fact, Debezium source connectors generate change events that have fields with similar logical types to represent a variety of different data types, including but not limited to, timestamps, dates, and even JSON data.

The Debezium JDBC sink connector uses these primitive and logical types to resolve a column’s type to a JDBC SQL code, which represents a column’s type. These JDBC SQL codes are then used by the underlying Hibernate persistence framework to resolve the column’s type to a logical data type for the dialect in use. The following tables illustrate the primitive and logical mappings between Kafka Connect and JDBC SQL types, and between Debezium and JDBC SQL types. The actual final column type varies with for each database type.

Table 1. Mappings between Kafka Connect Primitives and Column Data Types
Primitive Type JDBC SQL Type

INT8

Types.TINYINT

INT16

Types.SMALLINT

INT32

Types.INTEGER

INT64

Types.BIGINT

FLOAT32

Types.FLOAT

FLOAT64

Types.DOUBLE

BOOLEAN

Types.BOOLEAN

STRING

Types.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR

BYTES

Types.VARBINARY

Table 2. Mappings between Kafka Connect Logical Types and Column Data Types
Logical Type JDBC SQL Type

org.apache.kafka.connect.data.Decimal

Types.DECIMAL

org.apache.kafka.connect.data.Date

Types.DATE

org.apache.kafka.connect.data.Time

Types.TIMESTAMP

org.apache.kafka.connect.data.Timestamp

Types.TIMESTAMP

Table 3. Mappings between Debezium Logical Types and Column Data Types
Logical Type JDBC SQL Type

io.debezium.time.Date

Types.DATE

io.debezium.time.Time

Types.TIMESTAMP

io.debezium.time.MicroTime

Types.TIMESTAMP

io.debezium.time.NanoTime

Types.TIMESTAMP

io.debezium.time.ZonedTime

Types.TIME_WITH_TIMEZONE

io.debezium.time.Timestamp

Types.TIMESTAMP

io.debezium.time.MicroTimestamp

Types.TIMESTAMP

io.debezium.time.NanoTimestamp

Types.TIMESTAMP

io.debezium.time.ZonedTimestamp

Types.TIMESTAMP_WITH_TIMEZONE

io.debezium.data.VariableScaleDecimal

Types.DOUBLE

If the database does not support time or timestamps with time zones, the mapping resolves to its equivalent without timezones.

Table 4. Mappings between Debezium dialect-specific Logical Types and Column Data Types
Logical Type MySQL SQL Type PostgreSQL SQL Type SQL Server SQL Type

io.debezium.data.Bits

bit(n)

bit(n) or bit varying

varbinary(n)

io.debezium.data.Enum

enum

Types.VARCHAR

n/a

io.debezium.data.Json

json

json

n/a

io.debezium.data.EnumSet

set

n/a

n/a

io.debezium.time.Year

year(n)

n/a

n/a

io.debezium.time.MicroDuration

n/a

interval

n/a

io.debezium.data.Ltree

n/a

ltree

n/a

io.debezium.data.Uuid

n/a

uuid

n/a

io.debezium.data.Xml

n/a

xml

xml

In addition to the primitive and logical mappings above, if the source of the change events is a Debezium source connector, the resolution of the column type, along with its length, precision, and scale, can be further influenced by enabling column or data type propagation. To enforce propagation, one of the following properties must be set in the source connector configuration:

  • column.propagate.source.type

  • datatype.propagate.source.type

The Debezium JDBC sink connector applies the values with the higher precedence.

For example, let’s say the following field schema is included in a change event:

Debezium change event field schema with column or data type propagation enabled
{
  "schema": {
    "type": "INT8",
    "parameters": {
      "__debezium.source.column.type": "TINYINT",
      "__debezium.source.column.length": "1"
    }
  }
}

In the preceding example, if no schema parameters are set, the Debezium JDBC sink connector maps this field to a column type of Types.SMALLINT. Types.SMALLINT can have different logical database types, depending on the database dialect. For MySQL, the column type in the example converts to a TINYINT column type with no specified length. If column or data type propagation is enabled for the source connector, the Debezium JDBC sink connector uses the mapping information to refine the data type mapping process and create a column with the type TINYINT(1).

Typically, the effect of using column or data type propagation is much greater when the same type of database is used for both the source and sink database. We are continually looking at ways to improve this mapping across heterogeneous databases and the current type system allows us to continue to refine these mappings based on feedback. If you find a mapping could be improved, please let us know.

Deployment

To deploy a Debezium JDBC connector, you install the Debezium JDBC connector archive, configure the connector, and start the connector by adding its configuration to Kafka Connect.

Prerequisites
Procedure
  1. Download the Debezium JDBC connector plug-in archive.

  2. Extract the files into your Kafka Connect environment.

  3. Optionally download the JDBC driver from Maven Central and extract the downloaded driver file to the directory that contains the JDBC sink connector JAR file.

    Drivers for Oracle and Db2 are not included with the JDBC sink connector. You must download the drivers and install them manually.

  4. Add the driver JAR files to the path where the JDBC sink connector has been installed.

  5. Make sure that the path where you install the JDBC sink connector is part of the Kafka Connect plugin.path.

  6. Restart the Kafka Connect process to pick up the new JAR files.

Debezium JDBC connector configuration

Typically, you register a Debezium JDBC connector by submitting a JSON request that specifies the configuration properties for the connector. The following example shows a JSON request for registering an instance of the Debezium JDBC sink connector that consumes events from a topic called orders with the most common configuration settings:

Example: Debezium JDBC connector configuration
{
    "name": "jdbc-connector",  (1)
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",  (2)
        "tasks.max": "1",  (3)
        "connection.url": "jdbc:postgresql://localhost/db",  (4)
        "connection.username": "pguser",  (5)
        "connection.password": "pgpassword",  (6)
        "insert.mode": "upsert",  (7)
        "delete.enabled": "true",  (8)
        "primary.key.mode": "record_key",  (9)
        "schema.evolution": "basic",  (10)
        "database.time_zone": "UTC",  (11)
        "topics": "orders" (12)
    }
}
1 The name that is assigned to the connector when you register it with Kafka Connect service.
2 The name of the JDBC sink connector class.
3 The maximum number of tasks to create for this connector.
4 The JDBC URL that the connector uses to connect to the sink database that it writes to.
5 The name of the database user used for authentication.
6 The password of the database user used for authentication.
7 The insert.mode that the connector uses.
8 Enables the deletion of records in the database. For more information, see the delete.enabled configuration property.
9 Specifies the method used to resolve primary key columns. For more information, see the primary.key.mode configuration property.
10 Enables the connector to evolve the destination database’s schema. For more information, see the schema.evolution configuration property.
11 Specifies the timezone used when writing temporal field types.
12 List of topics to consume, separated by commas.

For a complete list of configuration properties that you can set for the Debezium JDBC connector, see JDBC connector properties.

You can send this configuration with a POST command to a running Kafka Connect service. The service records the configuration and starts a sink connector task(s) that performs the following operations:

  • Connects to the database.

  • Consumes events from subscribed Kafka topics.

  • Writes the events to the configured database.

Connector properties

The Debezium JDBC sink connector has several configuration properties that you can use to achieve the connector behavior that meets your needs. Many properties have default values. Information about the properties is organized as follows:

Table 5. Generic properties
Property Default Description

No default

Unique name for the connector. A failure results if you attempt to reuse this name when registering a connector. This property is required by all Kafka Connect connectors.

No default

The name of the Java class for the connector. For the Debezium JDBC connector, specify the value io.debezium.connector.jdbc.JdbcSinkConnector.

1

Maximum number of tasks to use for this connector.

No default

List of topics to consume, separated by commas. Do not use this property in combination with the topics.regex property.

No default

A regular expression that specifies the topics to consume. Internally, the regular expression is compiled to a java.util.regex.Pattern. Do not use this property in combination with the topics property.

Table 6. JDBC connector connection properties
Property Default Description

No default

The JDBC connection URL used to connect to the database.

No default

The name of the database user account that the connector uses to connect to the database.

No default

The password that the connector uses to connect to the database.

5

Specifies the minimum number of connections in the pool.

32

Specifies the maximum number of concurrent connections that the pool maintains.

32

Specifies the number of connections that the connector attempts to acquire if the connection pool exceeds its maximum size.

1800

Specifies the number of seconds that an unused connection is kept before it is discarded.

Table 7. JDBC connector runtime properties
Property Default Description

UTC

Specifies the timezone used when inserting JDBC temporal values.

false

Specifies whether the connector processes DELETE or tombstone events and removes the corresponding row from the database. Use of this option requires that you set the primary.key.mode to record.key.

false

Specifies whether the connector processes TRUNCATE events and truncates the corresponding tables from the database.

Although support for TRUNCATE statements has been available in Db2 since version 9.7, currently, the JDBC connector is unable to process standard TRUNCATE events that the Db2 connector emits.

To ensure that the JDBC connector can process TRUNCATE events received from Db2, perform the truncation by using an alternative to the standard TRUNCATE TABLE statement. For example:

ALTER TABLE <table_name> ACTIVATE NOT LOGGED INITIALLY WITH EMPTY TABLE

The user account that submits the preceding query requires ALTER privileges on the table to be truncated.

insert

Specifies the strategy used to insert events into the database. The following options are available:

insert

Specifies that all events should construct INSERT-based SQL statements. Use this option only when no primary key is used, or when you can be certain that no updates can occur to rows with existing primary key values.

update

Specifies that all events should construct UPDATE-based SQL statements. Use this option only when you can be certain that the connector receives only events that apply to existing rows.

upsert

Specifies that the connector adds events to the table using upsert semantics. That is, if the primary key does not exist, the connector performs an INSERT operation, and if the key does exist, the connector performs an UPDATE operation. When idempotent writes are required, the connector should be configured to use this option.

none

Specifies how the connector resolves the primary key columns from the event.

none

Specifies that no primary key columns are created.

kafka

Specifies that the connector uses Kafka coordinates as the primary key columns. The key coordinates are defined from the topic name, partition, and offset of the event, and are mapped to columns with the following names:

  • __connect_topic

  • __connect_partition

  • __connect_offset

record_key

Specifies that the primary key columns are sourced from the event’s record key. If the record key is a primitive type, the primary.key.fields property is required to specify the name of the primary key column. If the record key is a struct type, the primary.key.fields property is optional, and can be used to specify a subset of columns from the event’s key as the table’s primary key.

record_value

Specifies that the primary key columns is sourced from the event’s value. You can set the primary.key.fields property to define the primary key as a subset of fields from the event’s value; otherwise all fields are used by default.

No default

Either the name of the primary key column or a comma-separated list of fields to derive the primary key from.

When primary.key.mode is set to record_key and the event’s key is a primitive type, it is expected that this property specifies the column name to be used for the key.

When the primary.key.mode is set to record_key with a non-primitive key, or record_value, it is expected that this property specifies a comma-separated list of field names from either the key or value. If the primary.key.mode is set to record_key with a non-primitive key, or record_value, and this property is not specifies, the connector derives the primary key from all fields of either the record key or record value, depending on the specified mode.

false

Specifies whether generated SQL statements use quotation marks to delimit table and column names. See the Quoting and case sensitivity section for more details.

none

Specifies how the connector evolves the destination table schemas. For more information, see Schema evolution. The following options are available:

none

Specifies that the connector does not evolve the destination schema.

basic

Specifies that basic evolution occurs. The connector adds missing columns to the table by comparing the incoming event’s record schema to the database table structure.

${topic}

Specifies a string that determines how the destination table name is formatted, based on the topic name of the event. The placeholder, ${topic}, is replaced by the topic name.

public

Specifies the schema name where the PostgreSQL PostGIS extension is installed. The default is public; however, if the PostGIS extension was installed in another schema, this property should be used to specify the alternate schema name.

false

Specifies whether the connector automatically sets an IDENTITY_INSERT before an INSERT or UPSERT operation into the identity column of SQL Server tables, and then unsets it immediately after the operation. When the default setting (false) is in effect, an INSERT or UPSERT operation into the IDENTITY column of a table results in a SQL exception.

Table 8. JDBC connector extendable properties
Property Default Description

i.d.c.j.n.DefaultColumnNamingStrategy

Specifies the fully-qualified class name of a ColumnNamingStrategy implementation that the connector uses to resolve column names from event field names.

By default, the connector uses the field name as the column name.

i.d.c.j.n.DefaultTableNamingStrategy

Specifies the fully-qualified class name of a TableNamingStrategy implementation that the connector uses to resolve table names from incoming event topic names.

The default behavior is to:

  • Replace the ${topic} placeholder in the table.name.format configuration property with the event’s topic.

  • Sanitize the table name by replacing dots (.) with underscores (_).

Frequently asked questions

Is the ExtractNewRecordState single message transformation required?

No, that is actually one of the differentiating factors of the Debezium JDBC connector from other implementations. While the connector is capable of ingesting flattened events like its competitors, it can also ingest Debezium’s complex change event structure natively, without requiring any specific type of transformation.

If a column’s type is changed, or if a column is renamed or dropped, is this handled by schema evolution?

No, the Debezium JDBC connector does not make any changes to existing columns. The schema evolution supported by the connector is quite basic. It simply compares the fields in the event structure to the table’s column list, and then adds any fields that are not yet defined as columns in the table. If a column’s type or default value change, the connector does not adjust them in the destination database. If a column is renamed, the old column is left as-is, and the connector appends a column with the new name to the table; however existing rows with data in the old column remain unchanged. These types of schema changes should be handled manually.

If a column’s type does not resolve to the type that I want, how can I enforce mapping to a different data type?

The Debezium JDBC connector uses a sophisticated type system to resolve a column’s data type. For details about how this type system resolves a specific field’s schema definition to a JDBC type, see the Data and column type mappings section. If you want to apply a different data type mapping, define the table manually to explicitly obtain the preferred column type.

How do you specify a prefix or a suffix to the table name without changing the Kafka topic name?

In order to add a prefix or a suffix to the destination table name, adjust the table.name.format connector configuration property to apply the prefix or suffix that you want. For example, to prefix all table names with jdbc_, specify the table.name.format configuration property with a value of jdbc_${topic}. If the connector is subscribed to a topic called orders, the resulting table is created as jdbc_orders.

Why are some columns automatically quoted, even though identifier quoting is not enabled?

In some situations, specific column or table names might be explicitly quoted, even when quote.identifiers is not enabled. This is often necessary when the column or table name starts with or uses a specific convention that would otherwise be considered illegal syntax. For example, when the primary.key.mode is set to kafka, some databases only permit column names to begin with an underscore if the column’s name is quoted. Quoting behavior is dialect-specific, and varies among different types of database.