You are viewing documentation for an outdated version of Debezium.
If you want to view the latest stable version of this page, please go here.

Custom Converters

This feature is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems while using this extension.

Datatype Conversion

The Debezium connectors map database column types to corresponding Kafka Connect schema types and convert the column values accordingly. The specific column type mappings are documented for each of the connectors and represent a reasonable default behavior for most of the time. It is still possible that an application requires a specific handling of a certain column type or specific column due to downstream system requirements. For instance you might want to export temporal column values as a formatted string instead of milli-seconds since epoch.

For this purpose Debezium provides an extension point that allows users to inject their own converters based on their business requirements. The converters are written in Java and are enabled and configured via connector properties.

During connector startup, all configured converters are instantiated and placed in an registry. While the connector-internal schema representation is built, every converter is invoked for every column/field of every table/collection and it can register itself to become responsible for the conversion of the given column or field.

Whenever a new change is processed by Debezium, the converter is invoked to execute the actual conversion for the registered columns or fields.

Implementing Converters

The converter implementation is a Java class that implements the interface io.debezium.spi.converter.CustomConverter:

public interface CustomConverter<S, F extends ConvertedField> {

    @FunctionalInterface
    interface Converter {
        Object convert(Object input);
    }

    public interface ConverterRegistration<S> {
        void register(S fieldSchema, Converter converter);
    }

    void configure(Properties props);

    void converterFor(F field, ConverterRegistration<S> registration);
}

The method configure() is used to pass converter configuration options into the converter after its instantiation, so it can modify its runtime behaviour for each specific instance. The method converterFor() is invoked by Debezium and the converter is required to call registration in case of taking responsibility for the conversion. The registration provides the target schema definition and the actual conversion code. Schemas are currently represented using Kafka Connect’s SchemaBuilder API. In the future, an independent schema definition API will be added. The metadata about the column or field are passed via the field parameter. They contain information like table or collection name, column or field name, type name, and others.

The following example implements a simple converter that will:

  • accept one parameter named schema.name

  • register itself for every column of type isbn with

    • the target STRING schema named according to the schema.name parameter

    • the conversion code that converts the ISBN data to String

public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

    private SchemaBuilder isbnSchema;

    @Override
    public void configure(Properties props) {
        isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name"));
    }

    @Override
    public void converterFor(RelationalColumn column,
            ConverterRegistration<SchemaBuilder> registration) {

        if ("isbn".equals(column.typeName())) {
            registration.register(isbnSchema, x -> x.toString());
        }
    }
}

To compile the code it is necessary to provide dependencies to the debezium-api and connect-api modules like:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-api</artifactId>
    <version>${version.kafka}</version>
</dependency>

where ${version.debezium} and ${version.kafka} are the versions of Debezium and Apache Kafka, respectively.

Configuring and Using Converters

After the converter is developed it has to be deployed in a JAR file side-by-side with the Debezium connector JARs. To enable the converter for a given connector instance it is necessary to provide the connector options like this:

converters=isbn
isbn.type=io.debezium.test.IsbnConverter
isbn.schema.name=io.debezium.postgresql.type.Isbn

The option converters is mandatory and enumerates comma-separated symbolic names of converter instances to be used. The symbolic names are used as a prefix for further configuration options.

isbn.type (generally <prefix>.type) is mandatory and is the name of the class that implements the converter.

isbn.schema.name is a converter parameter that is passed to the converter’s configure method as schema.name.