Avro Serialization

Debezium connectors are used with the Kafka Connect framework to capture changes in databases and generate change events. The Kafka Connect workers then apply to each of the messages generated by the connector the transformations configured for the connector, serialize each message key and value into a binary form using the configured converters, and finally write each messages into the correct Kafka topic.

The converters can either be specified in the Kafka Connect worker configuration, in which case the same converters are used for all connectors deployed to that worker’s cluster. Alternatively, they can be specified for an individual connector. Kafka Connect comes with a JSON converter that serializes the message keys and values into JSON documents. The JSON converter can be configured to include or exclude the message schema using the (key.converter.schemas.enable and value.converter.schemas.enable) properties. Our tutorial shows what the messages look like when both payload and schemas are included, but the schemas make the messages very verbose. If you want your messages serialized with JSON, consider setting these properties to false to exclude the verbose schema information.

Alternatively, you can serialize the message keys and values using Apache Avro. The Avro binary format is extremely compact and efficient, and Avro schemas make it possible to ensure that the messages have the correct structure. Avro’s schema evolution mechanism makes it possible to evolve the schemas over time, which is essential for Debezium connectors that dynamically generate the message schemas to match the structure of the database tables. Over time, the change events captured by Debezium connectors and written by Kafka Connect into a topic may have different versions of the same schema, and Avro serialization makes it far easier for consumers to adapt to the changing schema.

The Apicurio API and Schema Registry

The open-source project Apicurio Registry provides several components that work with Avro:

  • An Avro converter that can be used in Kafka Connect workers to map the Kafka Connect schemas into Avro schemas and to then use those Avro schemas to serialize the message keys and values into the very compact Avro binary form.

  • An API/Schema registry that tracks all of the Avro schemas used in Kafka topics, and where the Avro Converter sends the generated Avro schemas. Since the Avro schemas are stored in this registry, each message need only include a tiny schema identifier. This makes each message even smaller, and for an I/O bound system like Kafka this means more total throughput of the producers and consumers.

  • Avro Serdes (serializers and deserializers) for Kafka producers and consumers. Any Kafka consumer applications you write to consume change events can use the Avro Serdes to deserialize the changes events.

You can install them into any Kafka distribution and use them with Kafka Connect.

The Apicurio project also provides a JSON converter that can be used with the Apicurio registry. This combines the advantage of less verbose messages (as messages do not contain the schema information themselves, but only a schema id) with human-readable JSON.

Another option is using the Confluent schema registry, which is described further below.

Technical Information

A system that wants to use Avro serialization needs to complete two steps:

key.converter=io.apicurio.registry.utils.converter.AvroConverter
key.converter.apicurio.registry.url=http://apicurio:8080/api
key.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
value.converter=io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.url=http://apicurio:8080/api
value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy

Note that Kafka Connect internally always uses the JSON key/value converters for storing configuration and offsets.

Debezium Container Images

Deploy an Apicurio Registry instance (this example uses a non-production in-memory instance):

docker run -it --rm --name apicurio \
    -p 8080:8080 apicurio/apicurio-registry-mem:1.2.2.Final

Build a Debezium image with Avro converter from Dockerfile:

docker build --build-arg DEBEZIUM_VERSION=1.2 -t debezium/connect-apicurio:1.2 .

Run a Kafka Connect image configured to use Avro:

docker run -it --rm --name connect \
    --link zookeeper:zookeeper \
    --link kafka:kafka \
    --link mysql:mysql \
    --link apicurio:apicurio \
    -e GROUP_ID=1 \
    -e CONFIG_STORAGE_TOPIC=my_connect_configs \
    -e OFFSET_STORAGE_TOPIC=my_connect_offsets \
    -e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \
    -e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \
    -e CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
    -e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio:8080 \
    -e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_GLOBAL-ID=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy \
    -e CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
    -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio:8080 \
    -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_GLOBAL-ID=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy \
    -p 8083:8083 debezium/connect-apicurio:1.2

Naming

As stated in the Avro documentation, names must adhere to the following rules:

  • Start with [A-Za-z_]

  • Subsequently contain only [A-Za-z0-9_] characters

Debezium uses the column’s name as the basis for the corresponding Avro field. This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules above. Debezium provides a configuration option, sanitize.field.names that can be set to true if you have columns that do not adhere to the rule-set above, allowing those fields to be serialized without having to actually modify your schema.

Confluent Schema Registry

There is an alternative schema registry implementation provided by Confluent.

The configuration is slightly different. Here are the properties that should be used:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

An instance of the Confluent Schema Registry can be deployed like so:

docker run -it --rm --name schema-registry \
    --link zookeeper \
    -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \
    -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
    -e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \
    -p 8181:8181 confluentinc/cp-schema-registry

Run a Kafka Connect image configured to use Avro:

docker run -it --rm --name connect \
    --link zookeeper:zookeeper \
    --link kafka:kafka \
    --link mysql:mysql \
    --link schema-registry:schema-registry \
    -e GROUP_ID=1 \
    -e CONFIG_STORAGE_TOPIC=my_connect_configs \
    -e OFFSET_STORAGE_TOPIC=my_connect_offsets \
    -e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \
    -e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \
    -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
    -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
    -p 8083:8083 debezium/connect:1.2

Run a console consumer which reads new Avro messages from the db.myschema.mytable topic and decodes to JSON:

docker run -it --rm --name avro-consumer \
    --link zookeeper:zookeeper \
    --link kafka:kafka \
    --link mysql:mysql \
    --link schema-registry:schema-registry \
    debezium/connect:1.2 \
    /kafka/bin/kafka-console-consumer.sh \
      --bootstrap-server kafka:9092 \
      --property print.key=true \
      --formatter io.confluent.kafka.formatter.AvroMessageFormatter \
      --property schema.registry.url=http://schema-registry:8081 \
      --topic db.myschema.mytable

Getting More Information

This post from the Debezium blog describes the concepts of serializers, converters etc. and discusses the advantages of using Avro. Note that some details around Kafka Connect converters have slightly changed since the time of writing this post.

For a complete example of using Avro as the message format for Debezium data change events, please see the MySQL and the Avro message format tutorial example.