Debezium connectors are used with the Kafka Connect framework to capture changes in databases and generate change events. The Kafka Connect workers then apply to each of the messages generated by the connector the transformations configured for the connector, serialize each message key and value into a binary form using the worker’s converters, and finally write each messages into the correct Kafka topic.
The converters are specified in the Kafka Connect worker configuration, and the same converters are used for all connectors deployed to that worker’s cluster.
Kafka Connect comes with a JSON converter that serializes the message keys and values into JSON documents.
The JSON converter can be configured to include or exclude the message schema using the (
Our tutorial shows what the messages look like when both payload and schemas are included, but the schemas make the messages very verbose.
If you want your messages serialized with JSON, consider setting these properties to
false to exclude the verbose schema information.
Another option is to serialize the message keys and values using Apache Avro. The Avro binary format is extremely compact and efficient, and Avro schemas make it possible to ensure that the messages have the correct structure. Avro’s schema evolution mechanism makes it possible to evolve the schemas over time, which is essential for Debezium connectors that dynamically generate the message schemas to match the structure of the database tables. Over time, the change events captured by Debezium connectors and written by Kafka Connect into a topic may have different versions of the same schema, and Avro serialization makes it far easier for consumers to adapt to the changing schema.
Confluent provides several components that work with Avro:
An Avro Converter that can be used in Kafka Connect workers to map the Kafka Connect schemas into Avro schemas and to then use those Avro schemas to serialize the message keys and values into the very compact Avro binary form.
A Schema Registry that tracks all of the Avro schemas used in Kafka topics, and where the Avro Converter sends the generated Avro schemas. Since the Avro schemas are stored in this registry, each message need only include a tiny schema identifier. This makes each message even smaller, and for an I/O bound system like Kafka this means more total throughput of the producers and consumers.
Avro Serdes (serializers and deserializers) for Kafka producers and consumers. Any Kafka consumer applications you write to consume change events can use the Avro serdes to deserialize the changes events.
These Confluent components are open source, and you can install them into any Kafka distribution and use them with Kafka Connect. However, Confluent also provides a Confluent Open Source Platform that includes the standard Kafka distribution as well as these and other Confluent open source components, including several source and sink connectors. Some Docker images for Kafka Connect also contain the Avro converter. This includes recent Debezium Docker images that include the Debezium connectors, Kafka Connect, and the Avro converter.
A system that wants to use Avro serialization needs to complete two steps:
Deploy a Schema Registry instance
Use these properties to configure Apache Connect instance
key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
Note: In addition to setting key/value converters,it is strongly recommended to set internal key/value converters to use JSON converters for easier analysis of stored configuration and offsets. If you would still prefer to use Avro converter it is not possible now due to a known issue.
See the MySQL and the Avro message format tutorial example for a quickstart with MySQL.
Deploy a Schema Registry instance:
docker run -it --rm --name schema-registry \ --link zookeeper \ -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \ -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \ -e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \ -p 8181:8181 confluentinc/cp-schema-registry
Run a Kafka Connect image configured to use Avro:
docker run -it --rm --name connect \ --link zookeeper:zookeeper \ --link kafka:kafka \ --link mysql:mysql \ --link schema-registry:schema-registry \ -e GROUP_ID=1 \ -e CONFIG_STORAGE_TOPIC=my_connect_configs \ -e OFFSET_STORAGE_TOPIC=my_connect_offsets \ -e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \ -e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \ -e INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -p 8083:8083 debezium/connect:1.1
Run a console consumer which reads new Avro messages from the
db.myschema.mytable topic and decodes to JSON:
docker run -it --rm --name avro-consumer \ --link zookeeper:zookeeper \ --link kafka:kafka \ --link mysql:mysql \ --link schema-registry:schema-registry \ debezium/connect:1.1 \ /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --property print.key=true \ --formatter io.confluent.kafka.formatter.AvroMessageFormatter \ --property schema.registry.url=http://schema-registry:8081 \ --topic db.myschema.mytable
As stated in the Avro documentation, names must adhere to the following rules:
Subsequently contain only
Debezium uses the column’s name as the basis for the Avro field.
This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules above.
Debezium provides a configuration option,
sanitize.field.names that can be set to
true if you have columns that do not adhere to the rule-set above, allowing those fields to be serialized without having to actually modify your schema.