Debezium connector for Cloud Spanner

Debezium’s Cloud Spanner connector consumes and outputs Spanner change streams data into a Kafka topic.

A Spanner change stream watches and streams out a Spanner database’s data changes - inserts, updates, deletes - in near-real-time. The Spanner connector abstracts away the details of querying the Spanner change streams. With this connector, you don’t have to manage the change streams partition lifecycle, which is necessary when you use the Spanner API directly.

The connector does not support the snapshot feature at the moment. The first time the Kafka connector connects to a Spanner database, it will stream changes from either the provided timestamp, or the current timestamp if a timestamp is not provided.

Overview

To read and process database changes, the connector queries from a change stream.

The connector produces a change event for every row-level insert, update, and delete operation that was captured and sends change event records for each table in a separate Kafka topic. Client applications read the Kafka topics that correspond to the database tables of interest, and can react to every row-level event they receive from those topics.

The connector is tolerant of failures. As the connector reads changes and produces events, it records the last commit timestamp processed for each change stream partition. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart the connector continues streaming records where it last left off.

How the connector works

To optimally configure and run a Debezium Spanner connector, it is helpful to understand how the connector streams change events, determines Kafka topic names, and uses metadata.

Streaming changes

The Debezium Spanner connector spends all its time streaming changes from the change stream to which it is subscribed. When a change occurs on a table, Spanner writes a corresponding change stream record in the database, synchronously in the same transaction as the data change. To scale change streams writes and reads, Spanner splits and merges the internal change stream storage along with the database data. To support reading change stream records in near real-time as database writes scale, the Spanner API is designed for a change stream to be queried concurrently using the change stream partitions. See about Spanner change stream’s partitioning model.

The connector offers an abstraction over the Spanner API for querying change streams. With this connector, you don’t have to manage the change stream partition lifecycle. The connector provides you with a stream of data change records so that you are free to focus more on application logic and less on specific API details and dynamic change stream partitioning.

When subscribing to a change stream, the connector needs to provide the the project ID, Spanner instance ID, Spanner database ID, as well as the change stream name. Optionally, the user can also provide a start timestamp and an end timestamp. See the this section for a more detailed list of the connector configuration properties.

When the connector receives changes it transforms the events into Debezium create, update, or delete events. The connector forwards these change events in records to the Kafka Connect framework, which is running in the same process. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.

Periodically, Kafka Connect records the most recent offset in another Kafka topic. The offset indicates source-specific position information that Debezium includes with each event. For the Spanner connector, the last processed commit timestamps for the change stream partitions is the offset.

When Kafka Connect gracefully shuts down, it stops the connectors, flushes all event records to Kafka, and records the last offset received from each connector. When Kafka Connect restarts, it reads the last recorded offset for each connector, and starts each connector at its last recorded offset.

The Spanner connector also records metadata information in the following metadata topics during streaming. It is not advisable to modify the content or the configuration of these topics:

  • _consumer_offsets: A topic automatically created by Kafka. Stores consumer offsets for consumers created in the Kafka connector

  • _kafka-connect-offsets: A topic automatically created by Kafka Connect. Stores the connector offsets.

  • _sync_topic_spanner_connector_connectorname: A topic automatically created by the connector. Stores metadata regarding change stream partitions.

  • _rebalancing_topic_spanner_connector_connectorname: A topic automatically created by the connector. Used to determine connector task aliveness.

  • _debezium-heartbeat.connectorname: A topic used to process Spanner change stream heartbeats.

Topic names

The Spanner connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. By default, the Kafka topic name is topicPrefix.connectorName.tableName where:

  • topicPrefix is the topic prefix as specified by the topic.prefix connector configuration property.

  • connectorName is the user-specified name of the connector.

  • tableName is the name of the database table in which the operation occurred.

For example, suppose that spanner is the logical name in the configuration for a connector that is capturing changes from a Spanner change stream that tracks changes in a database that contains four tables: table1, table2, table3, and table4. The connector would stream records to these four Kafka topics:

  • spanner.table1

  • spanner.table2

  • spanner.table3

  • spanner.table4

Data change events

The Debezium Spanner connector generates a data change event for each row-level INSERT, UPDATE, and DELETE operation. Each event contains a key and a value. The key and value are separate documents. The structure of the key and the value depends on the table that was changed.

Debezium and Kafka Connect are designed around continuous streams of event messages. However, the structure of these events may change over time, which can be difficult for consumers to handle. To address this, each event contains the schema for its content or, if you are using a schema registry, a schema ID that a consumer can use to obtain the schema from the registry. This makes each event self-contained. The schema for the key will never change. Please note that the schema for the value is an amalgamation of all the columns in the table that the change stream has tracked since the start time of the connector.

The following skeleton JSON documents show the basic structure for the key and value documents. However, how you configure the Kafka Connect converter that you choose to use in your application determines the representation of the key and value documents. A schema field is in a change event key or change event value only when you configure the converter to produce it. Likewise, the event key and event payload are present only if you configure a converter to produce it. If you use the JSON converter and you configure it to produce schemas, change events have this structure:

// Key
{
 "schema": { (1)
   ...
  },
 "payload": { (2)
   ...
 }
}

// Value
{
 "schema": { (3)
   ...
 },
 "payload": { (4)
   ...
 }
}
Table 1. Overview of change event basic content
Item Field name Description

1

schema

The first schema field is part of the event key. It specifies a Kafka Connect schema that describes what is in the event key’s payload portion. In other words, the first schema field describes the structure of the primary key.

2

payload

The first payload field is part of the event key. It has the structure described by the previous schema field and it contains the key for the row that was changed.

3

schema

The second schema field is part of the event value. It specifies the Kafka Connect schema that describes what is in the event value’s payload portion. In other words, the second schema describes the structure of the row that was changed. Typically, this schema contains nested schemas.

4

payload

The second payload field is part of the event value. It has the structure described by the previous schema field and it contains the actual data for the row that was changed.

The default behavior is that the connector streams change event records to topics with names that are the same as the event’s originating table.

Starting with Kafka 0.10, Kafka can optionally record the event key and value with the timestamp at which the message was created (recorded by the producer) or written to the log by Kafka.

Change event keys

For a given table, the change event’s key has a structure that contains a field for each column in the primary key of the table at the time the event was created. All key columns will be marked as non-optional.

Consider a users table defined in the business database and the example of a change event key for that table.

Example table
CREATE TABLE Users (
  id INT64 NOT NULL,
  username STRING(MAX) NOT NULL,
  password STRING(MAX) NOT NULL,
  email STRING(MAX) NOT NULL)
PRIMARY KEY (id);
Example change event key

Every change event for the users table while it has this definition has the same key structure, which in JSON looks like this:

{
  "schema": { (1)
    "type": "struct",
    "name": "Users.Key", (2)
    "optional": false, (3)
    "fields": [ (4)
      {
        "type": "int64",
        "optional": "false",
        "field": "false"
      }
    ]
  },
  "payload": { (5)
      "id": "1"
  },
}
Table 2. Description of change event key
Item Field name Description

1

schema

The schema portion of the key specifies a Kafka Connect schema that describes what is in the key’s payload portion.

2

Users.Key

Name of the schema that defines the structure of the key’s payload. This schema describes the structure of the primary key for the table that was changed.

3

optional

Indicates whether the event key must contain a value in its payload field. Primary key columns are always required.

4

fields

Specifies each field that is expected in the payload, including each field’s name, type, and whether it is optional.

5

payload

Contains the key for the row for which this change event was generated. In this example, the key, contains a single id field whose value is 1.

Change event values

Consider the same sample table that was used to show an example of a change event key:

CREATE TABLE Users (
  id INT64 NOT NULL,
  username STRING(MAX) NOT NULL,
  password STRING(MAX) NOT NULL,
  email STRING(MAX) NOT NULL)
PRIMARY KEY (id);

create events

The following example shows the value portion of a change event that the connector generates for an operation that creates data in the Users table. If a Spanner column is marked is non-optional, its value is required for all mutations that insert a row. All primary key columns in Spanner will be marked as non-optional. Note that even if a non-key column is marked as non-optional in Spanner, it will be shown as optional in the schema. Only primary key columns are marked as non-optional in the schema.

{
    "schema": { (1)
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "Users.Value", (2)
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "Users.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_us"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ns"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "default": false,
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "sequence"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "project_id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "instance_id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "database_id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "change_stream_name"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "table"
                    }
                    {
                        "type": "string",
                        "optional": true,
                        "field": "server_transaction_id"
                    }
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "low_watermark"
                    }
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "read_at_timestamp"
                    }
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "number_of_records_in_transaction"
                    }
                    {
                        "type": "string",
                        "optional": true,
                        "field": "transaction_tag"
                    }
                    {
                        "type": "boolean",
                        "optional": true,
                        "field": "system_transaction"
                    }
                    {
                        "type": "string",
                        "optional": true,
                        "field": "value_capture_type"
                    }
                    {
                        "type": "string",
                        "optional": true,
                        "field": "partition_token"
                    }
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "mod_number"
                    }
                    {
                        "type": "boolean",
                        "optional": true,
                        "field": "is_last_record_in_transaction_in_partition"
                    }
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "number_of_partitions_in_transaction"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.spanner.Source", (3)
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_us"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ns"
            }
        ],
        "optional": false,
        "name": "connector_name.Users.Envelope" (4)
    },
    "payload": { (5)
        "before": null, (6)
        "after": { (7)
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { (8)
            "version": "3.0.5.Final",
            "connector": "spanner",
            "name": "spanner_connector",
            "ts_ms": 1670955531785,
            "ts_us": 1670955531785000,
            "ts_ns": 1670955531785000000,
            "snapshot": "false",
            "db": "database",
            "sequence": "1",
            "project_id": "project",
            "instance_id": "instance",
            "database_id": "database",
            "change_stream_name": "change_stream",
            "table": "Users",
            "server_transaction_id": "transaction_id",
            "low_watermark": 1670955471635,
            "read_at_timestamp": 1670955531791,
            "number_records_in_transaction": 2,
            "transaction_tag": "",
            "system_transaction": false,
            "value_capture_type": "OLD_AND_NEW_VALUES",
            "partition_token": "partition_token",
            "mod_number": 0,
            "is_last_record_in_transaction_in_partition": true,
            "number_of_partitions_in_transaction": 1
        },
        "op": "c", (9)
        "ts_ms": 1559033904863, (10)
        "ts_us": 1559033904863769, (10)
        "ts_ns": 1559033904863769841 (10)
    }
}
Table 3. Descriptions of create event value fields
Item Field name Description

1

schema

The value’s schema, which describes the structure of the value’s payload. A change event’s value schema is the same in every change event that the connector generates for a particular table.

2

name

In the schema section, each name field specifies the schema for a field in the value’s payload.

3

name

io.debezium.connector.spanner.Source is the schema for the payload’s source field. This schema is specific to the Spanner connector. The connector uses it for all events that it generates.

4

name

connector_name.Users.Envelope is the schema for the overall structure of the payload, where connector_name is the connector name, and customers is the table.

5

payload

The value’s actual data. This is the information that the change event is providing.

6

before

An optional field that specifies the state of the row before the event occurred. When the op field is c for create, as it is in this example, the before field is null since this change event is for new content.

7

after

An optional field that specifies the state of the row after the event occurred. In this example, the after field contains the values of the new row’s id, first_name, last_name, and email columns.

8

source

Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events, the order in which the events occurred, and whether events were part of the same transaction. The source metadata includes:

  • Debezium version

  • Connector type and name

  • Database and table that contains the new row

  • If the event was part of a snapshot

  • The record sequence number for the data change event in the transaction

  • The project ID

  • The instance ID

  • The database ID

  • The change stream name

  • The transaction ID

  • The low watermark, which represents the timestamp at which all records with commit timestamp less than the low watermark timestamp have already been streamed out by the connector

  • The commit timestamp for when the change was made in the database

  • The timestamp at which the connnector processed the change

  • The number of records in the originating transaction

  • The transaction tag

  • Whether or not the transaction was a system transaction

  • The value capture type

  • The originating partition token used to query this change event

  • The mod number in the original data change event received from Spanner

  • Whether or not the data change event was the last record in the transaction in the partition

  • The total number of change stream partitions in the transaction

9

op

Mandatory string that describes the type of operation that caused the connector to generate the event. In this example, c indicates that the operation created a row. Valid values are:

  • c = create

  • u = update

  • d = delete

10

ts_ms, ts_us, ts_ns

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In the source object, ts_ms indicates the time that the change was made in the database. By comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can determine the lag between the source database update and Debezium.

update events

The value of a change event for an update in the sample Users table has the same schema as a create event for that table. Likewise, the event value’s payload has the same structure. However, the event value payload contains different values in an update event. Here is an example of a change event value in an event that the connector generates for an update in the Users table:

{
    "schema": { ... },
    "payload": {
        "before": { (1)
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "after": { (2)
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { (3)
            "version": "3.0.5.Final",
            "connector": "spanner",
            "name": "spanner_connector",
            "ts_ms": 1670955531785,
            "ts_us": 1670955531785000,
            "ts_ns": 1670955531785000000,
            "snapshot": "false",
            "db": "database",
            "sequence": "1",
            "project_id": "project",
            "instance_id": "instance",
            "database_id": "database",
            "change_stream_name": "change_stream",
            "table": "Users",
            "server_transaction_id": "transaction_id",
            "low_watermark": 1670955471635,
            "read_at_timestamp": 1670955531791,
            "number_records_in_transaction": 2,
            "transaction_tag": "",
            "system_transaction": false,
            "value_capture_type": "OLD_AND_NEW_VALUES",
            "partition_token": "partition_token",
            "mod_number": 0,
            "is_last_record_in_transaction_in_partition": true,
            "number_of_partitions_in_transaction": 1
        },
        "op": "u", (4)
        "ts_ms": 1465584025523,  (5)
        "ts_us": 1465584025523614,  (5)
        "ts_ns": 1465584025523614723  (5)
    }
}
Table 4. Descriptions of update event value fields
Item Field name Description

1

before

An optional field that contains all values of all columns that were in the row before the database commit.

2

after

An optional field that specifies the state of the row after the event occurred. In this example, the first_name value is now Anne Marie.

3

source

Mandatory field that describes the source metadata for the event. The source field structure has the same fields as in a create event, but some values are different. The source metadata includes:

  • Debezium version

  • Connector type and name

  • Database (a.k.a keyspace) and table that contains the new row

  • If the event was part of a snapshot

  • The record sequence number for the data change event in the transaction

  • The project ID

  • The instance ID

  • The database ID

  • The change stream name

  • The transaction ID

  • The low watermark, which represents the timestamp at which all records with commit timestamp less than the low watermark timestamp have already been streamed out by the connector

  • The commit timestamp for when the change was made in the database

  • The timestamp at which the connnector processed the change

  • The number of records in the originating transaction

  • The transaction tag

  • Whether or not the transaction was a system transaction

  • The value capture type

  • The originating partition token used to query this change event

  • The mod number in the original data change event received from Spanner

  • Whether or not the data change event was the last record in the transaction in the partition

  • The total number of change stream partitions in the transaction

4

op

Mandatory string that describes the type of operation. In an update event value, the op field value is u, signifying that this row changed because of an update.

5

ts_ms, ts_us, ts_ns

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In the source object, ts_ms indicates the time that the change was made in the database. By comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can determine the lag between the source database update and Debezium.

delete events

The value in a delete change event has the same schema portion as create and update events for the same table. The payload portion in a delete event for the sample Users table looks like this:

{
    "schema": { ... },
    "payload": {
        "before": { (1)
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "after": null, (2)
        "source": { (3)
            "version": "3.0.5.Final",
            "connector": "spanner",
            "name": "spanner_connector",
            "ts_ms": 1670955531785,
            "ts_us": 1670955531785000,
            "ts_ns": 1670955531785000000,
            "snapshot": "false",
            "db": "database",
            "sequence": "1",
            "project_id": "project",
            "instance_id": "instance",
            "database_id": "database",
            "change_stream_name": "change_stream",
            "table": "Users",
            "server_transaction_id": "transaction_id",
            "low_watermark": 1670955471635,
            "read_at_timestamp": 1670955531791,
            "number_records_in_transaction": 2,
            "transaction_tag": "",
            "system_transaction": false,
            "value_capture_type": "OLD_AND_NEW_VALUES",
            "partition_token": "partition_token",
            "mod_number": 0,
            "is_last_record_in_transaction_in_partition": true,
            "number_of_partitions_in_transaction": 1
        },
        "op": "d", (4)
        "ts_ms": 1465581902461, (5)
        "ts_us": 1465581902461425, (5)
        "ts_ns": 1465581902461425378 (5)
    }
}
Table 5. Descriptions of delete event value fields
Item Field name Description

1

before

Optional field that specifies the state of the row before the event occurred. In a delete event value, the before field contains the values that were in the row before it was deleted with the database commit.

2

after

Optional field that specifies the state of the row after the event occurred. In a delete event value, the after field is null, signifying that the row no longer exists.

3

source

Mandatory field that describes the source metadata for the event. In a delete event value, the source field structure is the same as for create and update events for the same table. Many source field values are also the same. In a delete event value, the ts_ms and lsn field values, as well as other values, might have changed. But the source field in a delete event value provides the same metadata:

  • Debezium version

  • Connector type and name

  • Database (a.k.a keyspace) and table that contains the new row

  • If the event was part of a snapshot

  • The record sequence number for the data change event in the transaction

  • The project ID

  • The instance ID

  • The database ID

  • The change stream name

  • The transaction ID

  • The low watermark, which represents the timestamp at which all records with commit timestamp less than the low watermark timestamp have already been streamed out by the connector

  • The commit timestamp for when the change was made in the database

  • The timestamp at which the connnector processed the change

  • The number of records in the originating transaction

  • The transaction tag

  • Whether or not the transaction was a system transaction

  • The value capture type

  • The originating partition token used to query this change event

  • The mod number in the original data change event received from Spanner

  • Whether or not the data change event was the last record in the transaction in the partition

  • The total number of change stream partitions in the transaction

4

op

Mandatory string that describes the type of operation. The op field value is d, signifying that this row was deleted.

5

ts_ms, ts_us, ts_ns

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In the source object, ts_ms indicates the time that the change was made in the database. By comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can determine the lag between the source database update and Debezium.

A delete change event record provides a consumer with the information it needs to process the removal of this row.

Spanner connector events are designed to work with Kafka log compaction. Log compaction enables removal of some older messages as long as at least the most recent message for every key is kept. This lets Kafka reclaim storage space while ensuring that the topic contains a complete data set and can be used for reloading key-based state. Note that if the low watermark is enabled, compaction should not be enabled.

Tombstone events

When a row is deleted, the delete event value still works with log compaction, because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that same key, the message value must be null. To make this possible, the connector follows a delete event with a special tombstone event that has the same key but a null value.

Data type mappings

The Spanner connector represents changes to rows with events that are structured like the table in which the row exists. The event contains a field for each column value. How that value is represented in the event depends on the Spanner data type of the column. This section describes these mappings.

Spanner types

Table 6. Mappings for Spanner data types
Spanner data type Literal type (schema type)

BOOLEAN

BOOLEAN

INT64

INT64

ARRAY

ARRAY

BYTES

BYTES

STRING

STRING

FLOAT32

FLOAT

FLOAT64

DOUBLE

NUMERIC

STRING

TIMESTAMP

STRING

NUMERIC

STRING

Setting up Spanner

Checklist
  • Make sure to provide a project ID, Spanner instance ID, Spanner database ID, and change stream name. See documentation on how to create change streams.

  • Make sure to create and configure a GCP service account with the proper credentials. The service account key will need to be provided in the connector configuration. See more information about service accounts here.

See the following section on how to configure a Debezium Spanner connector.

Deployment

With Zookeeper, Kafka, and Kafka Connect installed, the remaining tasks to deploy a Debezium Spanner connector are to download the connector’s plug-in archive, extract the JAR files into your Kafka Connect environment, and add the directory with the JAR files to Kafka Connect’s plugin.path. You then need to restart your Kafka Connect process to pick up the new JAR files.

If you are working with immutable containers, see Debezium’s Container images for Zookeeper, Kafka and Kafka Connect with the Spanner connector already installed and ready to run. You can also run Debezium on Kubernetes and OpenShift.

Connector configuration example

Following is an example of the configuration for a Spanner connector that connects to a change stream called changeStreamAll in the database Database in instance Instance and project Project.

You can choose to produce events for a subset of the schemas and tables. Optionally, ignore, mask, or truncate columns that are sensitive, too large, or not needed.

{
  "name": "spanner-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector", (2)
    "gcp.spanner.change.stream": "changeStreamAll", (3)
    "gcp.spanner.project.id": "Project", (4)
    "gcp.spanner.instance.id": "Instance", (5)
    "gcp.spanner.database.id": "Database", (6)
    "gcp.spanner.credentials.json": <key.json>, (7)
    "tasks.max": 1 (8)
  }
}
1 The name of the connector when registered with a Kafka Connect service.
2 The name of this Spanner connector class.
3 The change stream name.
4 The GCP Project ID.
5 The Spanner Instance ID.
6 The Spanner Database ID.
7 The GCP service account key JSON.
8 The maximum number of tasks.

See the complete list of Spanner connector properties that can be specified in these configurations.

Adding connector configuration

To start running a Spanner connector, create a connector configuration and add the configuration to your Kafka Connect cluster.

Prerequisites
  • The Spanner change stream is created and available.

  • The Spanner connector is installed.

Procedure
  1. Create a configuration for the Spanner connector.

  2. Use the Kafka Connect REST API to add that connector configuration to your Kafka Connect cluster.

Results

When the connector starts, it starts generating data change events for row-level operations and streaming change event records to Kafka topics.

Monitoring

The Debezium Spanner connector provides only one type of metric in addition to the built-in support for JMX metrics that Zookeeper, Kafka, and Kafka Connect provide.

  • Streaming metrics provide information about connector operations when the connector is capturing changes and streaming change event records.

Debezium monitoring documentation provides details for how to expose these metrics by using JMX.

Customized MBean names

Debezium connectors expose metrics via the MBean name for the connector. These metrics, which are specific to each connector instance, provide data about the behavior of the connector’s snapshot, streaming, and schema history processes.

By default, when you deploy a correctly configured connector, Debezium generates a unique MBean name for each of the different connector metrics. To view the metrics for a connector process, you configure your observability stack to monitor its MBean. But these default MBean names depend on the connector configuration; configuration changes can result in changes to the MBean names. A change to the MBean name breaks the linkage between the connector instance and the MBean, disrupting monitoring activity. In this scenario, you must reconfigure the observability stack to use the new MBean name if you want to resume monitoring.

To prevent monitoring disruptions that result from MBean name changes, you can configure custom metrics tags. You configure custom metrics by adding the custom.metric.tags property to the connector configuration. The property accepts key-value pairs in which each key represents a tag for the MBean object name, and the corresponding value represents the value of that tag. For example: k1=v1,k2=v2. Debezium appends the specified tags to the MBean name of the connector.

After you configure the custom.metric.tags property for a connector, you can configure the observability stack to retrieve metrics associated with the specified tags. The observability stack then uses the specified tags, rather than the mutable MBean names to uniquely identify connectors. Later, if Debezium redefines how it constructs MBean names, or if the topic.prefix in the connector configuration changes, metrics collection is uninterrupted, because the metrics scrape task uses the specified tag patterns to identify the connector.

A further benefit of using custom tags, is that you can use tags that reflect the architecture of your data pipeline, so that metrics are organized in a way that suits you operational needs. For example, you might specify tags with values that declare the type of connector activity, the application context, or the data source, for example, db1-streaming-for-application-abc. If you specify multiple key-value pairs, all of the specified pairs are appended to the connector’s MBean name.

The following example illustrates how tags modify the default MBean name.

Example 1. How custom tags modify the connector MBean name

By default, the Spanner connector uses the following MBean name for streaming metrics:

debezium.Cloud Spanner:type=connector-metrics,context=streaming,server=<topic.prefix>

If you set the value of custom.metric.tags to database=salesdb-streaming,table=inventory, Debezium generates the following custom MBean name:

debezium.Cloud Spanner:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory

Streaming metrics

The MBean is debezium.Cloud Spanner:type=connector-metrics,context=streaming,server=<topic.prefix>.

Attributes Type Description

long

The number of milliseconds since the connector has read and processed the most recent event.

long

The total number of events that this connector has seen since last started or reset.

long

The number of events that have been filtered by include/exclude list filtering rules configured on the connector.

int

The length the queue used to pass events between the streamer and the main Kafka Connect loop.

int

The free capacity of the queue used to pass events between the streamer and the main Kafka Connect loop.

boolean

Flag that denotes whether the connector is currently connected to the database server.

long

The number of milliseconds between the last change event’s timestamp and the connector processing it. The values will incorporate any differences between the clocks on the machines where the database server and the connector are running.

long

The number of processed transactions that were committed.

long

The maximum buffer of the queue in bytes used to pass events between the streamer and the main Kafka Connect loop.

long

The current buffer of the queue in bytes used to pass events between the streamer and the main Kafka Connect loop.

long

The current low watermark of the connector task. The low watermark describes the time T at which the connector is guaranteed to have streamed out all events with timestamp < T.

long

The current low watermark of the connector task in milliseconds. The low watermark describes the time T at which the connector is guaranteed to have streamed out all events with timestamp < T.

long

The lag of the low watermark behind the current time in milliseconds. The low watermark describes the time T at which the connector is guaranteed to have streamed out all events with timestamp < T.

long

The distribution of the lag of the low watermark behind the current time in milliseconds. The variant will include P50, P95, P99, Average, Min, Max calculations.

long

The distribution of the Spanner commit timestamp to connector read latency. The variant will include P50, P95, P99, Average, Min, Max calculations.

long

The distribution of the Spanner read timestamp to connector emit latency. The variant will include P50, P95, P99, Average, Min, Max calculations.

long

The distribution of the Spanner commit timestamp to connector emit latency. The variant will include P50, P95, P99, Average, Min, Max calculations.

long

The distribution of the Spanner commit timestamp to Kafka publish timestamp latency. The variant will include P50, P95, P99, Average, Min, Max calculations.

long

The distribution of the connector emit timestamp to Kafka publish timestamp latency. The variant will include P50, P95, P99, Average, Min, Max calculations.

long

The total Spanner event queue capacity. This queue indicates the total capacity of StreamEventQueue, a Spanner-specific queue that stores elements received from change stream queries.

long

The remaining Spanner event queue capacity.

long

The total task state change event queue capacity. This queue indicates the total capacity of the TaskStateChangeEventQueue, a Spanner-specific queue that stores events happening in the connector.

long

The remaining task state change event queue capacity.

long

The total number of partitions detected by the current task.

long

The total number of change stream queries issued by the current task.

long

The active number of change stream queries detected by the current task.

Connector configuration properties

The Debezium Spanner connector has many configuration properties that you can use to achieve the right connector behavior for your application. Many properties have default values. Information about the properties is organized as follows:

The following configuration properties are required unless a default value is available.

Table 7. Required connector configuration properties
Property Default Description

No default

Unique name for the connector. Attempting to register again with the same name will fail. This property is required by all Kafka Connect connectors.

No default

The name of the Java class for the connector. Always use a value of io.debezium.connector.spanner.SpannerConnector for the Spanner connector.

1

The maximum number of tasks that should be created for this connector. The Spanner connector can use more than 1 tasks if you enable offset.storage.per.task mode.

No default

The GCP project ID

No default

The Spanner instance ID

No default

The Spanner database ID

No default

The Spanner change stream

No default

The file path to the GCP service account key JSON.

No default

The GCP service account key JSON. Required if gcp.spanner.credentials.path is not provided.

none

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings:

  • none does not apply any adjustment.

  • avro replaces the characters that cannot be used in the Avro type name with underscore.

  • avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java

none

Specifies how field names should be adjusted for compatibility with the message converter used by the connector. Possible settings:

  • none does not apply any adjustment.

  • avro replaces the characters that cannot be used in the Avro type name with underscore.

  • avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java

See Avro naming for more details.

The following advanced configuration properties have defaults that work in most situations and therefore rarely need to be specified in the connector’s configuration.

Table 8. Advanced connector configuration properties
Property Default Description

false

Whether or not the low watermark is enabled for the connector.

1000 ms

The interval at which the low watermark is updated.

300000

The Spanner heartbeat interval.

current time

The connector start time.

indefinite end time

The connector end time.

10000

The Spanner event queue capacity. Increase this capacity if the remaining stream event queue capacity approaches zero during connector runtime.

1000

The task state change event queue capacity. Increase this capacity if the remaining task state change event queue capacity approaches zero during connector runtime.

5

The maximum number of missed heartbeats for a change stream query before an exception is thrown

false

Whether or not task autoscaling is enabled

sync_topic_spanner_connector<connectorname>

The name for the Sync topic. The Sync topic is an internal connector topic used to store communication between tasks.

500 ms

The poll duration for the sync topic.

5000 ms

The timeout for requests to the sync topic.

15000 ms

The timeout for publishing to the sync topic.

5000 ms

The timeout for committing offsets for the sync topic.

60000 ms

The interval at which offsets are committed for the sync topic.

5 ms

The interval at which messages are published to the sync topic.

rebalancing_topic_spanner_connector<connectorname>

The name for the rebalancing topic. The rebalancing topic is an internal connector topic used to determine task aliveness.

5000

The poll duration for the rebalancing topic.

5000

The timeout for committing offsets for the rebalance topic.

60000 ms

The interval at which offsets are committed for the sync topic.

1000 ms

The duration of time a task waits before processing a rebalancing event.

No default

Defines tags that customize MBean object names by adding metadata that provides contextual information. Specify a comma-separated list of key-value pairs. Each key represents a tag for the MBean object name, and the corresponding value represents a value for the key, for example,
k1=v1,k2=v2

The connector appends the specified tags to the base MBean object name. Tags can help you to organize and categorize metrics data. You can define tags to identify particular application instances, environments, regions, versions, and so forth. For more information, see Customized MBean names.

-1

Specifies how the connector responds after an operation that results in a retriable error, such as a connection error.
Set one of the following options:

-1

No limit. The connector always restarts automatically, and retries the operation, regardless of the number of previous failures.

0

Disabled. The connector fails immediately, and never retries the operation. User intervention is required to restart the connector.

> 0

The connector restarts automatically until it reaches the specified maximum number of retries. After the next failure, the connector stops, and user intervention is required to restart it.

For a more complete list of advanced configuration, see Github code.

Pass-through connector configuration properties

The connector also supports pass-through configuration properties that are used when creating the Kafka producer and consumer.

Be sure to consult the Kafka documentation for all of the configuration properties for Kafka producers and consumers. The Spanner connector does use the new consumer configuration properties.

Behavior when things go wrong

Debezium is a distributed system that captures all changes in multiple upstream databases; it never misses or loses an event. When the system is operating normally or being managed carefully then Debezium provides exactly once delivery of every change event record.

If a fault does happen then the system does not lose any events. However, while it is recovering from the fault, it might repeat some change events. In these abnormal situations, Debezium, like Kafka, provides at least once delivery of change events.

The rest of this section describes how Debezium handles various kinds of faults and problems.

Configuration and startup errors

In the following situations, the connector fails when trying to start, reports an error/exception in the log, and stops running:

  • The connector’s configuration is invalid.

  • The connector cannot successfully connect to Spanner by using the specified connection parameters.

In these cases, the error message has details about the problem and possibly a suggested workaround. After you correct the configuration or address the Spanner problem, restart the connector.

Spanner becomes unavailable

When the connector is running, Spanner could become unavailable for any number of reasons. The connector will continue running and will be able to stream events once Spanner becomes available again.

Kafka Connect process stops gracefully

Suppose that Kafka Connect is being run in distributed mode and a Kafka Connect process is stopped gracefully. Prior to shutting down that process, Kafka Connect migrates the process’s connector tasks to another Kafka Connect process in that group. The new connector tasks start processing exactly where the prior tasks stopped. There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.

Kafka Connect process crashes

If the Kafka Connector process stops unexpectedly, any connector tasks it was running terminate without recording their most recently processed offsets. When Kafka Connect is being run in distributed mode, Kafka Connect restarts those connector tasks on other processes. However, Spanner connectors resume from the last offset that was recorded by the earlier processes. This means that the new replacement tasks might generate some of the same change events that were processed just prior to the crash. The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.

Because there is a chance that some events might be duplicated during a recovery from failure, consumers should always anticipate some duplicate events.

In each change event record, Debezium connectors insert source-specific information about the origin of the event, such as the originating partition token, the commit timestamp, the transaction ID, the record sequence, and the mod number. Consumers could use these identifiers for deduplication.

Kafka becomes unavailable

As the connector generates change events, the Kafka Connect framework records those events in Kafka by using the Kafka producer API. Periodically, at a frequency that you specify in the Kafka Connect configuration, Kafka Connect records the latest offset that appears in those change events. If the Kafka brokers become unavailable, the Kafka Connect process that is running the connectors repeatedly tries to reconnect to the Kafka brokers. In other words, the connector tasks pause until a connection can be re-established, at which point the connectors resume exactly where they left off.

Connector is stopped for a duration

If the connector is gracefully stopped, the database can continue to be used. When the connector restarts, it resumes streaming changes where it left off. That is, it generates change event records for all database changes that were made while the connector was stopped.

A properly configured Kafka cluster is able to handle massive throughput. Kafka Connect is written according to Kafka best practices, and given enough resources a Kafka Connect connector can also handle very large numbers of database change events. Because of this, after being stopped for a while, when a Debezium connector restarts, it is very likely to catch up with the database changes that were made while it was stopped. How quickly this happens depends on the capabilities and performance of Kafka and the volume of changes being made to the data in Spanner.

Note that the current connector can only be streamed back in time by ~one hour. The Kafka connector reads the information schema at the Kafka connector’s start timestamp to retrieve schema information. By default, Spanner cannot read the information schema at read timestamps before the version retention period, which defaults to one hour. If you wants to start the connector from earlier than one hour into the past, you will need to increase the database’s version retention period.

Limitations