Overview of how the MySQL connector works

The Debezium MySQL connector tracks the structure of the tables, performs snapshots, transforms binlog events into Debezium change events and records where those events are recorded in Kafka.

How the MySQL connector uses database schemas

When a database client queries a database, it uses the database’s current schema. As database schemas often change, the Debezium MySQL connector knows how the schema appeared for each INSERT, UPDATE, and DELETE operation.

MySQL includes both row-level changes and DDL statements in its binlog which the connector reads to parse and update the in-memory representation of each table’s schema. This is used to understand the table structure at the time of each operation, which produces accurate change events.

The connector records all DDL statements along with their position in the binlog in a separate database history so that when the connector restarts (after a possible crash or graceful shutdown), it continues reading the binlog from that specific point in time.
See The MySQL connector and Kafka topics for more on topic naming conventions.
Additional resources

How the MySQL connector performs database snapshots

When your Debezium MySQL connector is first started, it performs an initial consistent snapshot of your database. The following flow describes how this snapshot is completed.

This is the default snapshot mode which is set as initial in the snapshot.mode property. For other snapshots modes, please check out the MySQL connector configuration properties.
The connector…​
Step Action

1

Grabs a global read lock that blocks writes by other database clients.

The snapshot itself does not prevent other clients from applying DDL which might interfere with the connector’s attempt to read the binlog position and table schemas. The global read lock is kept while the binlog position is read before released in a later step.

2

Starts a transaction with repeatable read semantics to ensure that all subsequent reads within the transaction are done against the consistent snapshot.

3

Reads the current binlog position.

4

Reads the schema of the databases and tables allowed by the connector’s configuration.

5

Releases the global read lock. This now allows other database clients to write to the database.

6

Writes the DDL changes to the schema change topic, including all necessary DROP…​ and CREATE…​ DDL statements.

This happens if applicable.

7

Scans the database tables and generates CREATE events on the relevant table-specific Kafka topics for each row.

8

Commits the transaction.

9

Records the completed snapshot in the connector offsets.

What happens if the connector fails?

If the connector fails, stops, or is rebalanced while making the initial snapshot, the connector creates a new snapshot once restarted. Once that intial snapshot is completed, the Debezium MySQL connector restarts from the same position in the binlog so it does not miss any updates.

If the connector stops for long enough, MySQL could purge old binlog files and the connector’s position would be lost. If the position is lost, the connector reverts to the initial snapshot for its starting position. For more tips on troubleshooting the Debezium MySQL connector, see [connector-common-issues].

What if Global Read Locks are not allowed?

Some environments do not allow a global read lock. If the Debezium MySQL connector detects that global read locks are not permitted, the connector uses table-level locks instead and performs a snapshot with this method.

The user must have LOCK_TABLES privileges.
The connector…​
Step Action

1

Starts a transaction with repeatable read semantics to ensure that all subsequent reads within the transaction are done against the consistent snapshot.

2

Reads and filters the names of the databases and tables.

3

Reads the current binlog position.

4

Reads the schema of the databases and tables allowed by the connector’s configuration.

5

Writes the DDL changes to the schema change topic, including all necessary DROP…​ and CREATE…​ DDL statements.

This happens if applicable.

6

Scans the database tables and generates CREATE events on the relevant table-specific Kafka topics for each row.

7

Commits the transaction.

8

Releases the table-level locks.

9

Records the completed snapshot in the connector offsets.

How the MySQL connector handles schema change topics

You can configure the Debezium MySQL connector to produce schema change events that include all DDL statements applied to databases in the MySQL server. The connector writes all of these events to a Kafka topic named <serverName> where serverName is the name of the connector as specified in the database.server.name configuration property.

If you choose to use schema change events, use the schema change topic and do not consume the database history topic.
Make sure that the num.partitions configuration for Kafka is set to 1 to ensure schema changes are kept in the correct order.

Schema change topic structure

Each message that is written to the schema change topic contains a message key which includes the name of the connected database used when applying DDL statements:

{
  "schema": {
    "type": "struct",
    "name": "io.debezium.connector.mysql.SchemaChangeKey",
    "optional": false,
    "fields": [
      {
        "field": "databaseName",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": {
    "databaseName": "inventory"
  }
}

The schema change event message value contains a structure that includes the DDL statements, the database to which the statements were applied, and the position in the binlog where the statements appeared:

{
  "schema": {
    "type": "struct",
    "name": "io.debezium.connector.mysql.SchemaChangeValue",
    "optional": false,
    "fields": [
      {
        "field": "databaseName",
        "type": "string",
        "optional": false
      },
      {
        "field": "ddl",
        "type": "string",
        "optional": false
      },
      {
        "field": "source",
        "type": "struct",
        "name": "io.debezium.connector.mysql.Source",
        "optional": false,
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_sec"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ]
      }
    ]
  },
  "payload": {
    "databaseName": "inventory",
    "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); ALTER TABLE products AUTO_INCREMENT = 101;",
    "source" : {
      "version": "0.10.0.Beta4",
      "name": "mysql-server-1",
      "server_id": 0,
      "ts_sec": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": null,
      "table": null,
      "query": null
    }
  }
}

Important tips regarding schema change topics

The ddl field may contain multiple DDL statements. Every statement applies to the database in the databaseName field and appears in the same order as they were applied in the database. The source field is structured exactly as a standard data change event written to table-specific topics. This field is useful to correlate events on different topic.

....
    "payload": {
        "databaseName": "inventory",
        "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,...
        "source" : {
            ....
        }
    }
....
What if a client submits DDL statements to multiple databases?
  • If MySQL applies them atomically, the connector takes the DDL statements in order, groups them by database, and creates a schema change event for each group.

  • If MySQL applies them individually, the connector creates a separate schema change event for each statement.

Additional resources

MySQL connector events

All data change events produced by the Debezium MySQL connector contain a key and a value. The change event key and the change event value each contain a schema and a payload where the schema describes the structure of the payload and the payload contains the data.

The MySQL connector ensures that all Kafka Connect schema names adhere to the Avro schema name format. This is important as any character that is not a latin letter or underscore is replaced by an underscore which can lead to unexpected conflicts in schema names when the logical server names, database names, and table names container other characters that are replaced with these underscores.

Change event key

For any given table, the change event’s key has a structure that contains a field for each column in the PRIMARY KEY (or unique constraint) at the time the event was created. Let us look at an example table and then how the schema and payload would appear for the table.

example table
CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
example change event key
{
 "schema": { (1)
    "type": "struct",
 "name": "mysql-server-1.inventory.customers.Key", (2)
 "optional": false, (3)
 "fields": [ (4)
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
 "payload": { (5)
    "id": 1001
  }
}
  1. schema describes what is in the payload

  2. mysql-server-1.inventory.customers.Key is the name of the schema which defines the structure where mysql-server-1 is the connector name, inventory is the database and customers is the table

  3. denotes that the payload is not optional

  4. specifies the type of fields expected in the payload

  5. the payload itself which in this case only contains a single id field

This key describes the row in the inventory.customers table which is out from the connector entitled mysql-server-1 whose id primary key column has a value of 1001.

Change event value

The change event value contains a schema and a payload section. There are three types of change event values which have an envelope structure. The fields in this structure are explained below and marked on each of the change event value examples.

Item Field name Description

1

name

mysql-server-1.inventory.customers.Key is the name of the schema which defines the structure where mysql-server-1 is the connector name, inventory is the database and customers is the table

2

op

A mandatory string that describes the type of operation.

values
  • c = create

  • u = update

  • d = delete

  • r = read (initial snapshot only)

3

before

An optional field that specifies the state of the row before the event occurred.

4

after

An optional field that specifies the state of the row after the event occurred.

5

source

A mandatory field that describes the source metadata for the event including:

  • the Debezium version

  • the connector name

  • the binlog name where the event was recorded

  • the binlog position

  • the row within the event

  • if the event was part of a snapshot

  • the name of the affected database and table

  • the id of the MySQL thread creating the event (non-snapshot only)

  • the MySQL server ID (if available)

  • timestamp

If the binlog_rows_query_log_events option is enabled and the connector has the include.query option enabled, a query field is displayed which contains the original SQL statement that generated the event.

6

ts_ms

An optional field that displays the time at which the connector processed the event.

The time is based on the system clock in the JVM running the Kafka Connect task.

Let us look at an example table and then how the schema and payload would appear for the table.

example table
CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

Create change event value

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.product.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.customers.Envelope" (1)
  },
  "payload": {
    "op": "c", (2)
    "ts_ms": 1465491411815,
    "before": null, (3)
    "after": { (4)
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { (5)
      "version": "0.10.0.Beta4",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 0, (6)
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": 7,
      "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
    }
  }
}

Update change event value

{
  "schema": { ... },
  "payload": {
    "before": { (3)
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": { (4)
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { (5)
      "version": "0.10.0.Beta4",
      "name": "mysql-server-1",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581, (6)
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", (2)
    "ts_ms": 1465581029523
  }
}

Delete change event value

{
  "schema": { ... },
  "payload": {
    "before": { (3)
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": null, (4)
    "source": { (5)
      "version": "0.10.0.Beta4",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581, (6)
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 805,
      "row": 0,
      "thread": 7,
      "query": "DELETE FROM customers WHERE id=1004"
    },
    "op": "d", (2)
    "ts_ms": 1465581902461
  }
}

How the MySQL connector maps data types

The Debezium MySQL connector represents changes to rows with events that are structured like the table in which the row exists. The event contains a field for each column value. The MySQL data type of that column dictates how the value is represented in the event.

Columns that store strings are defined in MySQL with a character set and collation. The MySQL connector uses the column’s character set when reading the binary representation of the column values in the binlog events. The following table shows how the connector maps the MySQL data types to both literal and semantic types.

  • literal type : how the value is represented using Kafka Connect schema types

  • semantic type : how the Kafka Connect schema captures the meaning of the field (schema name)

MySQL type Literal type Semantic type

BOOLEAN, BOOL

BOOLEAN

n/a

BIT(1)

BOOLEAN

n/a

BIT(>1)

BYTES

io.debezium.data.Bits

The length schema parameter contains an integer that represents the number of bits. The byte[] contains the bits in little-endian form and is sized to contain the specified number of bits.
example (where n is bits)
numBytes = n/8 + (n%8== 0 ? 0 : 1)

TINYINT

INT16

n/a

SMALLINT[(M)]

INT16

n/a

MEDIUMINT[(M)]

INT32

n/a

INT, INTEGER[(M)]

INT32

n/a

BIGINT[(M)]

INT64

n/a

REAL[(M,D)]

FLOAT32

n/a

FLOAT[(M,D)]

FLOAT64

n/a

DOUBLE[(M,D)]

FLOAT64

n/a

CHAR(M)]

STRING

n/a

VARCHAR(M)]

STRING

n/a

BINARY(M)]

BYTES

n/a

VARBINARY(M)]

BYTES

n/a

TINYBLOB

BYTES

n/a

TINYTEXT

STRING

n/a

BLOB

BYTES

n/a

TEXT

STRING

n/a

MEDIUMBLOB

BYTES

n/a

MEDIUMTEXT

STRING

n/a

LONGBLOB

BYTES

n/a

LONGTEXT

STRING

n/a

JSON

STRING

io.debezium.data.Json

Contains the string representation of a JSON document, array, or scalar.

ENUM

STRING

io.debezium.data.Enum

The allowed schema parameter contains the comma-separated list of allowed values.

SET

STRING

io.debezium.data.EnumSet

The allowed schema parameter contains the comma-separated list of allowed values.

YEAR[(2|4)]

INT32

io.debezium.time.Year

TIMESTAMP[(M)]

STRING

io.debezium.time.ZonedTimestamp

In ISO 8601 format with microsecond precision. MySQL allows M to be in the range of 0-6.

Temporal values

Excluding the TIMESTAMP data type, MySQL temporal types depend on the value of the time.precision.mode configuration property.

Temporal values without timezones are converted from UTC to milliseconds or microseconds (DATETIME) or to the configured database timezone (TIMESTAMP).

  • DATETIME with a value of 2018-06-20 06:37:03 becomes 1529476623000.

  • TIMESTAMP with a value of 2018-06-20 06:37:03 becomes 2018-06-20T13:37:03Z.

MySQL allows zero-values for DATE, DATETIME, and TIMESTAMP columns, which are sometimes preferred over null values. However, the MySQL connector represents them as null values when the column definition allows nulls, or as the epoch day when the column does not allow nulls.
time.precision.mode=adaptive_time_microseconds(default)

The MySQL connector determins the literal type and semantic type based on the column’s data type definition so that events represent exactly the values in the database; all time fields are in microseconds.

MySQL type Literal type Semantic type

DATE

INT32

io.debezium.time.Date

Represents the number of days since epoch.

TIME[(M)]

INT64

io.debezium.time.MicroTime

Represents the time value in microseconds and does not include timezone information. MySQL allows M to be in the range of 0-6.

DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)

INT64

io.debezium.time.Timestamp

Represents the number of milliseconds past epoch and does not include timezone information.

DATETIME(4), DATETIME(5), DATETIME(6)

INT64

io.debezium.time.MicroTimestamp

Represents the number of microseconds past epoch and does not include timezone information.
time.precision.mode=connect

The MySQL connector uses the predefined Kafka Connect logical types. This approach is less precise than the default approach and the events could be less precise if the database column has a fractional second precision value of greater than 3.

MySQL type Literal type Semantic type

DATE

INT32

org.apache.kafka.connect.data.Date

Represents the number of days since epoch.

TIME[(M)]

INT64

org.apache.kafka.connect.data.Time

Represents the time value in microseconds since midnight and does not include timezone information.

DATETIME[(M)]

INT64

org.apache.kafka.connect.data.Timestamp

Represents the number of milliseconds since epoch, and does not include timezone information.

Decimal values

Decimals are handled via the decimal.handling.mode property.

decimal.handling.mode=precise
MySQL type Literal type Semantic type

NUMERIC[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

The scale schema parameter contains an integer that represents how many digits the decimal point shifted.

DECIMAL[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

The scale schema parameter contains an integer that represents how many digits the decimal point shifted.
decimal.handling.mode=double
MySQL type Literal type Semantic type

NUMERIC[(M[,D])]

FLOAT64

n/a

DECIMAL[(M[,D])]

FLOAT64

n/a

decimal.handling.mode=string
MySQL type Literal type Semantic type

NUMERIC[(M[,D])]

STRING

n/a

DECIMAL[(M[,D])]

STRING

n/a

Boolean values

MySQL handles the BOOLEAN value internally in a specific way. The BOOLEAN column is internally mapped to TINYINT(1) datatype. When the table is created during streaming then it uses proper BOOLEAN mapping as Debezium receives the original DDL. During snapshot Debezium executes SHOW CREATE TABLE to obtain table definition which returns TINYINT(1) for both BOOLEAN and TINYINT(1) columns.

Debezium then has no way how to obtain the original type mapping and will map to TINYINT(1). The operator can configure the out-of-the-box custom converter TinyIntOneToBooleanConverter that would either map all TINYINT(1) columns to BOOLEAN or if selector parameter is set then a subset of columns could be enumerated using comma-separated regular expressions.

An example configuration is

converters=boolean
boolean.type=io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
boolean.selector=db1.table1.*, db1.table2.column1

Spatial data types

Currently, the Debezium MySQL connector supports the following spatial data types:

MySQL type Literal type Semantic type

GEOMETRY, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, GEOMETRYCOLLECTION

STRUCT

io.debezium.data.geometry.Geometry

Contains a structure with two fields:
  • srid (INT32: a spatial reference system id that defines the type of geometry object stored in the structure

  • wkb (BYTES): a binary representation of the geometry object encoded in the Well-Known-Binary (wkb) format. See the Open Geospatial Consortium for more details.

The MySQL connector and Kafka topics

The Debezium MySQL connector writes events for all INSERT, UPDATE, and DELETE operations from a single table to a single Kafka topic. The Kafka topic naming convention is as follows:

format
serverName.databaseName.tableName
Example 1. example

Let us say that fulfillment is the server name and inventory is the database which contains three tables of orders, customers, and products. The Debezium MySQL connector produces events on three Kafka topics, one for each table in the database:

fulfillment.inventory.orders

fulfillment.inventory.customers

fulfillment.inventory.products

MySQL supported topologies

The Debezium MySQL connector supports the following MySQL topologies:

Standalone

When a single MySQL server is used, the server must have the binlog enabled (and optionally GTIDs enabled) so the Debezium MySQL connector can monitor the server. This is often acceptable, since the binary log can also be used as an incremental backup. In this case, the MySQL connector always connects to and follows this standalone MySQL server instance.

Master and slave

The Debezium MySQL connector can follow one of the masters or one of the slaves (if that slave has its binlog enabled), but the connector only sees changes in the cluster that are visible to that server. Generally, this is not a problem except for the multi-master topologies.

The connector records its position in the server’s binlog, which is different on each server in the cluster. Therefore, the connector will need to follow just one MySQL server instance. If that server fails, it must be restarted or recovered before the connector can continue.

High available clusters

A variety of high availability solutions exist for MySQL, and they make it far easier to tolerate and almost immediately recover from problems and failures. Most HA MySQL clusters use GTIDs so that slaves are able to keep track of all changes on any of the master.

Multi-master

A multi-master MySQL topology uses one or more MySQL slaves that each replicate from multiple masters. This is a powerful way to aggregate the replication of multiple MySQL clusters, and requires using GTIDs.

The Debezium MySQL connector can use these multi-master MySQL slaves as sources, and can fail over to different multi-master MySQL slaves as long as thew new slave is caught up to the old slave (e.g., the new slave has all of the transactions that were last seen on the first slave). This works even if the connector is only using a subset of databases and/or tables, as the connector can be configured to include or exclude specific GTID sources when attempting to reconnect to a new multi-master MySQL slave and find the correct position in the binlog.

Hosted

There is support for the Debezium MySQL connector to use hosted options such as Amazon RDS and Amazon Aurora.

Because these hosted options do not allow a global read lock, table-level locks are used to create the consistent snapshot.