Debezium connector for Cloud Spanner
This connector is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems. |
Debezium’s Cloud Spanner connector consumes and outputs Spanner change streams data into a Kafka topic.
A Spanner change stream watches and streams out a Spanner database’s data changes - inserts, updates, deletes - in near-real-time. The Spanner connector abstracts away the details of querying the Spanner change streams. With this connector, you don’t have to manage the change streams partition lifecycle, which is necessary when you use the Spanner API directly.
The connector does not support the snapshot feature at the moment. The first time the Kafka connector connects to a Spanner database, it will stream changes from either the provided timestamp, or the current timestamp if a timestamp is not provided.
Overview
To read and process database changes, the connector queries from a change stream.
The connector produces a change event for every row-level insert, update, and delete operation that was captured and sends change event records for each table in a separate Kafka topic. Client applications read the Kafka topics that correspond to the database tables of interest, and can react to every row-level event they receive from those topics.
The connector is tolerant of failures. As the connector reads changes and produces events, it records the last commit timestamp processed for each change stream partition. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart the connector continues streaming records where it last left off.
How the connector works
To optimally configure and run a Debezium Spanner connector, it is helpful to understand how the connector streams change events, determines Kafka topic names, and uses metadata.
Streaming changes
The Debezium Spanner connector spends all its time streaming changes from the change stream to which it is subscribed. When a change occurs on a table, Spanner writes a corresponding change stream record in the database, synchronously in the same transaction as the data change. To scale change streams writes and reads, Spanner splits and merges the internal change stream storage along with the database data. To support reading change stream records in near real-time as database writes scale, the Spanner API is designed for a change stream to be queried concurrently using the change stream partitions. See about Spanner change stream’s partitioning model.
The connector offers an abstraction over the Spanner API for querying change streams. With this connector, you don’t have to manage the change stream partition lifecycle. The connector provides you with a stream of data change records so that you are free to focus more on application logic and less on specific API details and dynamic change stream partitioning.
When subscribing to a change stream, the connector needs to provide the the project ID, Spanner instance ID, Spanner database ID, as well as the change stream name. Optionally, the user can also provide a start timestamp and an end timestamp. See the this section for a more detailed list of the connector configuration properties.
When the connector receives changes it transforms the events into Debezium create, update, or delete events. The connector forwards these change events in records to the Kafka Connect framework, which is running in the same process. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.
Periodically, Kafka Connect records the most recent offset in another Kafka topic. The offset indicates source-specific position information that Debezium includes with each event. For the Spanner connector, the last processed commit timestamps for the change stream partitions is the offset.
When Kafka Connect gracefully shuts down, it stops the connectors, flushes all event records to Kafka, and records the last offset received from each connector. When Kafka Connect restarts, it reads the last recorded offset for each connector, and starts each connector at its last recorded offset.
The Spanner connector also records metadata information in the following metadata topics during streaming. It is not advisable to modify the content or the configuration of these topics:
-
_consumer_offsets
: A topic automatically created by Kafka. Stores consumer offsets for consumers created in the Kafka connector -
_kafka-connect-offsets
: A topic automatically created by Kafka Connect. Stores the connector offsets. -
_sync_topic_spanner_connector_connectorname
: A topic automatically created by the connector. Stores metadata regarding change stream partitions. -
_rebalancing_topic_spanner_connector_connectorname
: A topic automatically created by the connector. Used to determine connector task aliveness. -
_debezium-heartbeat.connectorname
: A topic used to process Spanner change stream heartbeats.
Topic names
The Spanner connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. By default, the Kafka topic name is topicPrefix.connectorName.tableName where:
-
topicPrefix is the topic prefix as specified by the
topic.prefix
connector configuration property. -
connectorName is the user-specified name of the connector.
-
tableName is the name of the database table in which the operation occurred.
For example, suppose that spanner
is the logical name in the configuration for a connector that is capturing changes from a Spanner change stream that tracks changes in a database that contains four tables: table1
, table2
, table3
, and table4
. The connector would stream records to these four Kafka topics:
-
spanner.table1
-
spanner.table2
-
spanner.table3
-
spanner.table4
Data change events
The Debezium Spanner connector generates a data change event for each row-level INSERT
, UPDATE
, and DELETE
operation. Each event contains a key and a value. The key and value are separate documents. The structure of the key and the value depends on the table that was changed.
Debezium and Kafka Connect are designed around continuous streams of event messages. However, the structure of these events may change over time, which can be difficult for consumers to handle. To address this, each event contains the schema for its content or, if you are using a schema registry, a schema ID that a consumer can use to obtain the schema from the registry. This makes each event self-contained. The schema for the key will never change. Please note that the schema for the value is an amalgamation of all the columns in the table that the change stream has tracked since the start time of the connector.
The following skeleton JSON documents show the basic structure for the key and value documents. However, how you configure the Kafka Connect converter that you choose to use in your application determines the representation of the key and value documents. A schema
field is in a change event key or change event value only when you configure the converter to produce it. Likewise, the event key and event payload are present only if you configure a converter to produce it. If you use the JSON converter and you configure it to produce schemas, change events have this structure:
// Key
{
"schema": { (1)
...
},
"payload": { (2)
...
}
}
// Value
{
"schema": { (3)
...
},
"payload": { (4)
...
}
}
Item | Field name | Description |
---|---|---|
1 |
|
The first |
2 |
|
The first |
3 |
|
The second |
4 |
|
The second |
The default behavior is that the connector streams change event records to topics with names that are the same as the event’s originating table.
Starting with Kafka 0.10, Kafka can optionally record the event key and value with the timestamp at which the message was created (recorded by the producer) or written to the log by Kafka. |
Change event keys
For a given table, the change event’s key has a structure that contains a field for each column in the primary key of the table at the time the event was created. All key columns will be marked as non-optional.
Consider a users
table defined in the business
database and the example of a change event key for that table.
CREATE TABLE Users (
id INT64 NOT NULL,
username STRING(MAX) NOT NULL,
password STRING(MAX) NOT NULL,
email STRING(MAX) NOT NULL)
PRIMARY KEY (id);
Every change event for the users
table while it has this definition has the same key structure, which in JSON looks like this:
{
"schema": { (1)
"type": "struct",
"name": "Users.Key", (2)
"optional": false, (3)
"fields": [ (4)
{
"type": "int64",
"optional": "false",
"field": "false"
}
]
},
"payload": { (5)
"id": "1"
},
}
Item | Field name | Description |
---|---|---|
1 |
|
The schema portion of the key specifies a Kafka Connect schema that describes what is in the key’s |
2 |
|
Name of the schema that defines the structure of the key’s payload. This schema describes the structure of the primary key for the table that was changed. |
3 |
|
Indicates whether the event key must contain a value in its |
4 |
|
Specifies each field that is expected in the |
5 |
|
Contains the key for the row for which this change event was generated. In this example, the key, contains a single |
Change event values
Consider the same sample table that was used to show an example of a change event key:
CREATE TABLE Users (
id INT64 NOT NULL,
username STRING(MAX) NOT NULL,
password STRING(MAX) NOT NULL,
email STRING(MAX) NOT NULL)
PRIMARY KEY (id);
create events
The following example shows the value portion of a change event that the connector generates for an operation that creates data in the Users
table. If a Spanner column is marked is non-optional, its value is required for all mutations that insert a row. All primary key columns in Spanner will be marked as non-optional. Note that even if a non-key column is marked as non-optional in Spanner, it will be shown as optional in the schema. Only primary key columns are marked as non-optional in the schema.
{
"schema": { (1)
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "first_name"
},
{
"type": "string",
"optional": true,
"field": "last_name"
},
{
"type": "string",
"optional": true,
"field": "email"
}
],
"optional": true,
"name": "Users.Value", (2)
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "Users.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "project_id"
},
{
"type": "string",
"optional": false,
"field": "instance_id"
},
{
"type": "string",
"optional": false,
"field": "database_id"
},
{
"type": "string",
"optional": false,
"field": "change_stream_name"
},
{
"type": "string",
"optional": true,
"field": "table"
}
{
"type": "string",
"optional": true,
"field": "server_transaction_id"
}
{
"type": "int64",
"optional": true,
"field": "low_watermark"
}
{
"type": "int64",
"optional": true,
"field": "read_at_timestamp"
}
{
"type": "int64",
"optional": true,
"field": "number_of_records_in_transaction"
}
{
"type": "string",
"optional": true,
"field": "transaction_tag"
}
{
"type": "boolean",
"optional": true,
"field": "system_transaction"
}
{
"type": "string",
"optional": true,
"field": "value_capture_type"
}
{
"type": "string",
"optional": true,
"field": "partition_token"
}
{
"type": "int32",
"optional": true,
"field": "mod_number"
}
{
"type": "boolean",
"optional": true,
"field": "is_last_record_in_transaction_in_partition"
}
{
"type": "int64",
"optional": true,
"field": "number_of_partitions_in_transaction"
}
],
"optional": false,
"name": "io.debezium.connector.spanner.Source", (3)
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "connector_name.Users.Envelope" (4)
},
"payload": { (5)
"before": null, (6)
"after": { (7)
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { (8)
"version": "2.1.4.Final",
"connector": "spanner",
"name": "spanner_connector",
"ts_ms": 1670955531785,
"snapshot": "false",
"db": "database",
"sequence": "1",
"project_id": "project",
"instance_id": "instance",
"database_id": "database",
"change_stream_name": "change_stream",
"table": "Users",
"server_transaction_id": "transaction_id",
"low_watermark": 1670955471635,
"read_at_timestamp": 1670955531791,
"number_records_in_transaction": 2,
"transaction_tag": "",
"system_transaction": false,
"value_capture_type": "OLD_AND_NEW_VALUES",
"partition_token": "partition_token",
"mod_number": 0,
"is_last_record_in_transaction_in_partition": true,
"number_of_partitions_in_transaction": 1
},
"op": "c", (9)
"ts_ms": 1559033904863 (10)
}
}
Item | Field name | Description |
---|---|---|
1 |
|
The value’s schema, which describes the structure of the value’s payload. A change event’s value schema is the same in every change event that the connector generates for a particular table. |
2 |
|
In the |
3 |
|
|
4 |
|
|
5 |
|
The value’s actual data. This is the information that the change event is providing. |
6 |
|
An optional field that specifies the state of the row before the event occurred. When the |
7 |
|
An optional field that specifies the state of the row after the event occurred. In this example, the |
8 |
|
Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events, the order in which the events occurred, and whether events were part of the same transaction. The source metadata includes:
|
9 |
|
Mandatory string that describes the type of operation that caused the connector to generate the event. In this example,
|
10 |
|
Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task. |
update events
The value of a change event for an update in the sample Users
table has the same schema as a create event for that table. Likewise, the event value’s payload has the same structure. However, the event value payload contains different values in an update event. Here is an example of a change event value in an event that the connector generates for an update in the Users
table:
{
"schema": { ... },
"payload": {
"before": { (1)
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": { (2)
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { (3)
"version": "2.1.4.Final",
"connector": "spanner",
"name": "spanner_connector",
"ts_ms": 1670955531785,
"snapshot": "false",
"db": "database",
"sequence": "1",
"project_id": "project",
"instance_id": "instance",
"database_id": "database",
"change_stream_name": "change_stream",
"table": "Users",
"server_transaction_id": "transaction_id",
"low_watermark": 1670955471635,
"read_at_timestamp": 1670955531791,
"number_records_in_transaction": 2,
"transaction_tag": "",
"system_transaction": false,
"value_capture_type": "OLD_AND_NEW_VALUES",
"partition_token": "partition_token",
"mod_number": 0,
"is_last_record_in_transaction_in_partition": true,
"number_of_partitions_in_transaction": 1
},
"op": "u", (4)
"ts_ms": 1465584025523 (5)
}
}
Item | Field name | Description |
---|---|---|
1 |
|
An optional field that contains all values of all columns that were in the row before the database commit. |
2 |
|
An optional field that specifies the state of the row after the event occurred. In this example, the |
3 |
|
Mandatory field that describes the source metadata for the event. The
|
4 |
|
Mandatory string that describes the type of operation. In an update event value, the |
5 |
|
Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task. |
delete events
The value in a delete change event has the same schema
portion as create and update events for the same table. The payload
portion in a delete event for the sample Users
table looks like this:
{
"schema": { ... },
"payload": {
"before": { (1)
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null, (2)
"source": { (3)
"version": "2.1.4.Final",
"connector": "spanner",
"name": "spanner_connector",
"ts_ms": 1670955531785,
"snapshot": "false",
"db": "database",
"sequence": "1",
"project_id": "project",
"instance_id": "instance",
"database_id": "database",
"change_stream_name": "change_stream",
"table": "Users",
"server_transaction_id": "transaction_id",
"low_watermark": 1670955471635,
"read_at_timestamp": 1670955531791,
"number_records_in_transaction": 2,
"transaction_tag": "",
"system_transaction": false,
"value_capture_type": "OLD_AND_NEW_VALUES",
"partition_token": "partition_token",
"mod_number": 0,
"is_last_record_in_transaction_in_partition": true,
"number_of_partitions_in_transaction": 1
},
"op": "d", (4)
"ts_ms": 1465581902461 (5)
}
}
Item | Field name | Description |
---|---|---|
1 |
|
Optional field that specifies the state of the row before the event occurred. In a delete event value, the |
2 |
|
Optional field that specifies the state of the row after the event occurred. In a delete event value, the |
3 |
|
Mandatory field that describes the source metadata for the event. In a delete event value, the
|
4 |
|
Mandatory string that describes the type of operation. The |
5 |
|
Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task. |
A delete change event record provides a consumer with the information it needs to process the removal of this row.
Spanner connector events are designed to work with Kafka log compaction. Log compaction enables removal of some older messages as long as at least the most recent message for every key is kept. This lets Kafka reclaim storage space while ensuring that the topic contains a complete data set and can be used for reloading key-based state. Note that if the low watermark is enabled, compaction should not be enabled.
When a row is deleted, the delete event value still works with log compaction, because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that same key, the message value must be null
. To make this possible, the connector follows a delete event with a special tombstone event that has the same key but a null
value.
Data type mappings
The Spanner connector represents changes to rows with events that are structured like the table in which the row exists. The event contains a field for each column value. How that value is represented in the event depends on the Spanner data type of the column. This section describes these mappings.
Setting up Spanner
-
Make sure to provide a project ID, Spanner instance ID, Spanner database ID, and change stream name. See documentation on how to create change streams.
-
Make sure to create and configure a GCP service account with the proper credentials. The service account key will need to be provided in the connector configuration. See more information about service accounts here.
See the following section on how to configure a Debezium Spanner connector.
Deployment
With Zookeeper, Kafka, and Kafka Connect installed, the remaining tasks to deploy a Debezium Spanner connector are to download the connector’s plug-in archive, extract the JAR files into your Kafka Connect environment, and add the directory with the JAR files to Kafka Connect’s plugin.path
. You then need to restart your Kafka Connect process to pick up the new JAR files.
If you are working with immutable containers, see Debezium’s Container images for Zookeeper, Kafka and Kafka Connect with the Spanner connector already installed and ready to run. You can also run Debezium on Kubernetes and OpenShift.
Connector configuration example
Following is an example of the configuration for a Spanner connector that connects to a change stream called changeStreamAll
in the database Database
in instance Instance
and project Project
.
You can choose to produce events for a subset of the schemas and tables. Optionally, ignore, mask, or truncate columns that are sensitive, too large, or not needed.
{
"name": "spanner-connector", (1)
"config": {
"connector.class": "io.debezium.connector.spanner.SpannerConnector", (2)
"gcp.spanner.change.stream": "changeStreamAll", (3)
"gcp.spanner.project.id": "Project", (4)
"gcp.spanner.instance.id": "Instance", (5)
"gcp.spanner.database.id": "Database", (6)
"gcp.spanner.credentials.json": <key.json>, (7)
"tasks.max": 1 (8)
}
}
1 | The name of the connector when registered with a Kafka Connect service. |
2 | The name of this Spanner connector class. |
3 | The change stream name. |
4 | The GCP Project ID. |
5 | The Spanner Instance ID. |
6 | The Spanner Database ID. |
7 | The GCP service account key JSON. |
8 | The maximum number of tasks. |
See the complete list of Spanner connector properties that can be specified in these configurations.
Adding connector configuration
To start running a Spanner connector, create a connector configuration and add the configuration to your Kafka Connect cluster.
-
The Spanner change stream is created and available.
-
The Spanner connector is installed.
-
Create a configuration for the Spanner connector.
-
Use the Kafka Connect REST API to add that connector configuration to your Kafka Connect cluster.
When the connector starts, it starts generating data change events for row-level operations and streaming change event records to Kafka topics.
Monitoring
The Debezium Spanner connector provides only one type of metric in addition to the built-in support for JMX metrics that Zookeeper, Kafka, and Kafka Connect provide.
-
Streaming metrics provide information about connector operations when the connector is capturing changes and streaming change event records.
Debezium monitoring documentation provides details for how to expose these metrics by using JMX.
Streaming metrics
The MBean is debezium.Cloud Spanner:type=connector-metrics,context=streaming,server=<Cloud Spanner.server.name>
.
Attributes | Type | Description |
---|---|---|
|
The number of milliseconds since the connector has read and processed the most recent event. |
|
|
The total number of events that this connector has seen since last started or reset. |
|
|
The number of events that have been filtered by include/exclude list filtering rules configured on the connector. |
|
|
The length the queue used to pass events between the streamer and the main Kafka Connect loop. |
|
|
The free capacity of the queue used to pass events between the streamer and the main Kafka Connect loop. |
|
|
Flag that denotes whether the connector is currently connected to the database server. |
|
|
The number of milliseconds between the last change event’s timestamp and the connector processing it. The values will incoporate any differences between the clocks on the machines where the database server and the connector are running. |
|
|
The number of processed transactions that were committed. |
|
|
The maximum buffer of the queue in bytes used to pass events between the streamer and the main Kafka Connect loop. |
|
|
The current buffer of the queue in bytes used to pass events between the streamer and the main Kafka Connect loop. |
|
|
The current low watermark of the connector task. The low watermark describes the time T at which the connector is guaranteed to have streamed out all events with timestamp < T. |
|
|
The current low watermark of the connector task in milliseconds. The low watermark describes the time T at which the connector is guaranteed to have streamed out all events with timestamp < T. |
|
|
The lag of the low watermark behind the current time in milliseconds. The low watermark describes the time T at which the connector is guaranteed to have streamed out all events with timestamp < T. |
|
|
The distribution of the lag of the low watermark behind the current time in milliseconds. The variant will include P50, P95, P99, Average, Min, Max calculations. |
|
|
The distribution of the Spanner commit timestamp to connector read latency. The variant will include P50, P95, P99, Average, Min, Max calculations. |
|
|
The distribution of the Spanner read timestamp to connector emit latency. The variant will include P50, P95, P99, Average, Min, Max calculations. |
|
|
The distribution of the Spanner commit timestamp to connector emit latency. The variant will include P50, P95, P99, Average, Min, Max calculations. |
|
|
The distribution of the Spanner commit timestamp to Kafka publish timestamp latency. The variant will include P50, P95, P99, Average, Min, Max calculations. |
|
|
The distribution of the connector emit timestamp to Kafka publish timestamp latency. The variant will include P50, P95, P99, Average, Min, Max calculations. |
|
|
The total Spanner event queue capacity. This queue indicates the total capacity of StreamEventQueue, a Spanner-specific queue that stores elements received from change stream queries. |
|
|
The remaining Spanner event queue capacity. |
|
|
The total task state change event queue capacity. This queue indicates the total capacity of the TaskStateChangeEventQueue, a Spanner-specific queue that stores events happening in the connector. |
|
|
The remaining task state change event queue capacity. |
|
|
The total number of partitions detected by the current task. |
|
|
The total number of change stream queries issued by the current task. |
|
|
The active number of change stream queries detected by the current task. |
Connector configuration properties
The Debezium Spanner connector has many configuration properties that you can use to achieve the right connector behavior for your application. Many properties have default values. Information about the properties is organized as follows:
The following configuration properties are required unless a default value is available.
Property | Default | Description |
---|---|---|
No default |
Unique name for the connector. Attempting to register again with the same name will fail. This property is required by all Kafka Connect connectors. |
|
No default |
The name of the Java class for the connector. Always use a value of |
|
|
The maximum number of tasks that should be created for this connector. The Spanner connector can use more than 1 tasks if you enable offset.storage.per.task mode. |
|
No default |
The GCP project ID |
|
No default |
The Spanner instance ID |
|
No default |
The Spanner database ID |
|
No default |
The Spanner change stream |
|
No default |
The file path to the GCP service account key JSON. |
|
No default |
The GCP service account key JSON. Required if gcp.spanner.credentials.path is not provided. |
The following advanced configuration properties have defaults that work in most situations and therefore rarely need to be specified in the connector’s configuration.
Property | Default | Description |
---|---|---|
|
Whether or not the low watermark is enabled for the connector. |
|
|
The interval at which the low watermark is updated. |
|
|
The Spanner heartbeat interval. |
|
|
The connector start time. |
|
|
The connector end time. |
|
|
The Spanner event queue capacity. Increase this capacity if the remaining stream event queue capacity approaches zero during connector runtime. |
|
|
The task state change event queue capacity. Increase this capacity if the remaining task state change event queue capacity approaches zero during connector runtime. |
|
|
The maximum number of missed heartbeats for a change stream query before an exception is thrown |
|
|
Whether or not task autoscaling is enabled |
|
|
The name for the Sync topic. The Sync topic is an internal connector topic used to store communication between tasks. |
|
|
The poll duration for the sync topic. |
|
|
The timeout for requests to the sync topic. |
|
|
The timeout for publishing to the sync topic. |
|
|
The timeout for committing offsets for the sync topic. |
|
|
The interval at which offsets are committed for the sync topic. |
|
|
The interval at which messages are published to the sync topic. |
|
|
The name for the rebalancing topic. The rebalancing topic is an internal connector topic used to determine task aliveness. |
|
|
The poll duration for the rebalancing topic. |
|
|
The timeout for committing offsets for the rebalance topic. |
|
|
The interval at which offsets are committed for the sync topic. |
|
|
The duration of time a task waits before processing a rebalancing event. |
For a more complete list of advanced configuration, see Github code.
The connector also supports pass-through configuration properties that are used when creating the Kafka producer and consumer.
Be sure to consult the Kafka documentation for all of the configuration properties for Kafka producers and consumers. The Spanner connector does use the new consumer configuration properties.
Behavior when things go wrong
Debezium is a distributed system that captures all changes in multiple upstream databases; it never misses or loses an event. When the system is operating normally or being managed carefully then Debezium provides exactly once delivery of every change event record.
If a fault does happen then the system does not lose any events. However, while it is recovering from the fault, it might repeat some change events. In these abnormal situations, Debezium, like Kafka, provides at least once delivery of change events.
The rest of this section describes how Debezium handles various kinds of faults and problems.
Configuration and startup errors
In the following situations, the connector fails when trying to start, reports an error/exception in the log, and stops running:
-
The connector’s configuration is invalid.
-
The connector cannot successfully connect to Spanner by using the specified connection parameters.
In these cases, the error message has details about the problem and possibly a suggested workaround. After you correct the configuration or address the Spanner problem, restart the connector.
Spanner becomes unavailable
When the connector is running, Spanner could become unavailable for any number of reasons. The connector will continue running and will be able to stream events once Spanner becomes available again.
Kafka Connect process stops gracefully
Suppose that Kafka Connect is being run in distributed mode and a Kafka Connect process is stopped gracefully. Prior to shutting down that process, Kafka Connect migrates the process’s connector tasks to another Kafka Connect process in that group. The new connector tasks start processing exactly where the prior tasks stopped. There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.
Kafka Connect process crashes
If the Kafka Connector process stops unexpectedly, any connector tasks it was running terminate without recording their most recently processed offsets. When Kafka Connect is being run in distributed mode, Kafka Connect restarts those connector tasks on other processes. However, Spanner connectors resume from the last offset that was recorded by the earlier processes. This means that the new replacement tasks might generate some of the same change events that were processed just prior to the crash. The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.
Because there is a chance that some events might be duplicated during a recovery from failure, consumers should always anticipate some duplicate events.
In each change event record, Debezium connectors insert source-specific information about the origin of the event, such as the originating partition token, the commit timestamp, the transaction ID, the record sequence, and the mod number. Consumers could use these identifiers for deduplication.
Kafka becomes unavailable
As the connector generates change events, the Kafka Connect framework records those events in Kafka by using the Kafka producer API. Periodically, at a frequency that you specify in the Kafka Connect configuration, Kafka Connect records the latest offset that appears in those change events. If the Kafka brokers become unavailable, the Kafka Connect process that is running the connectors repeatedly tries to reconnect to the Kafka brokers. In other words, the connector tasks pause until a connection can be re-established, at which point the connectors resume exactly where they left off.
Connector is stopped for a duration
If the connector is gracefully stopped, the database can continue to be used. When the connector restarts, it resumes streaming changes where it left off. That is, it generates change event records for all database changes that were made while the connector was stopped.
A properly configured Kafka cluster is able to handle massive throughput. Kafka Connect is written according to Kafka best practices, and given enough resources a Kafka Connect connector can also handle very large numbers of database change events. Because of this, after being stopped for a while, when a Debezium connector restarts, it is very likely to catch up with the database changes that were made while it was stopped. How quickly this happens depends on the capabilities and performance of Kafka and the volume of changes being made to the data in Spanner.
Note that the current connector can only be streamed back in time by ~one hour. The Kafka connector reads the information schema at the Kafka connector’s start timestamp to retrieve schema information. By default, Spanner cannot read the information schema at read timestamps before the version retention period, which defaults to one hour. If you wants to start the connector from earlier than one hour into the past, you will need to increase the database’s version retention period.
Limitations
-
The connector does not support streaming snapshot events.
-
If watermarking is enabled in the connector, you cannot configure Debezium topic routing transformations.
-
This connector currently does not support the PostgreSQL interface for Cloud Spanner.