Custom Converters
This feature is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems while using this extension. |
Datatype Conversion
Each field in a Debezium change event record represents a field or column in the source table or data collection. When a connector emits a change event record to Kafka, it converts the data type of each field in the source to a Kafka Connect schema type. Column values are likewise converted to match the schema type of the destination field. For each connector, a default mapping specifies how the connector converts each data type. These default mappings are described in the data types documentation for each connector.
While the default mappings are generally sufficient, for some applications you might want to apply an alternate mapping. For example, you might need a custom mapping if the default mapping exports a column using the format of milliseconds since the UNIX epoch, but your downstream application can only consume the column values as formatted strings. You customize data type mappings by developing and deploying a custom converter. You configure custom converters to act on all columns of a certain type, or you can narrow their scope so that they apply to a specific table column only. The converter function intercepts data type conversion requests for any columns that match a specified criteria, and then performs the specified conversion. The converter ignores columns that do not match the specified criteria.
Custom converters are Java classes that implement the Debezium service provider interface (SPI).
You enable and configure a custom converter by setting the converters
property in the connector configuration.
The converters
property specifies the converters that are available to a connector, and can include sub-properties that further modify conversion behavior.
After you start a connector, the converters that are enabled in the connector configuration are instantiated and are added to a registry. The registry associates each converter with the columns or fields for it to process. Whenever Debezium processes a new change event, it invokes the configured converter to convert the columns or fields for which it is registered.
Implementing custom converters
The following example shows a converter implementation of a Java class that implements the interface io.debezium.spi.converter.CustomConverter
:
public interface CustomConverter<S, F extends ConvertedField> {
@FunctionalInterface
interface Converter { (1)
Object convert(Object input);
}
public interface ConverterRegistration<S> { (2)
void register(S fieldSchema, Converter converter); (3)
}
void configure(Properties props);
void converterFor(F field, ConverterRegistration<S> registration); (4)
}
1 | A function for converting data from one type to another. |
2 | Callback for registering a converter. |
3 | Registers the given schema and converter for the current field. Should not be invoked more than once for the same field. |
4 | Registers the customized value and schema converter for use with a specific field. |
Custom converter methods
Implementations of the CustomConverter
interface must include the following methods:
configure()
-
Passes the properties specified in the connector configuration to the converter instance. The
configure
method runs when the connector is initialized. You can use a converter with multiple connectors and modify its behavior based on the connector’s property settings.
Theconfigure
method accepts the following argument:props
-
Contains the properties to pass to the converter instance. Each property specifies the format for converting the values of a particular type of column.
converterFor()
-
Registers the converter to process specific columns or fields in the data source. Debezium invokes the
converterFor()
method to prompt the converter to callregistration
for the conversion. TheconverterFor
method runs once for each column.
The method accepts the following arguments:field
-
An object that passes metadata about the field or column that is processed. The column metadata can include the name of the column or field, the name of the table or collection, the data type, size, and so forth.
registration
-
An object of type
io.debezium.spi.converter.CustomConverter.ConverterRegistration
that provides the target schema definition and the code for converting the column data. The converter calls theregistration
parameter when the source column matches the type that the converter should process. calls theregister
method to define the converter for each column in the schema. Schemas are represented using the Kafka ConnectSchemaBuilder
API. In the future, an independent schema definition API will be added.
Debezium custom converter example
The following example implements a simple converter that performs the following operations:
-
Runs the
configure
method, which configures the converter based on the value of theschema.name
property that is specified in the connector configuration. The converter configuration is specific to each instance. -
Runs the
converterFor
method, which registers the converter to process values in source columns for which the data type is set toisbn
.-
Identifies the target
STRING
schema based on the value that is specified for theschema.name
property. -
Converts ISBN data in the source column to
String
values.
-
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private SchemaBuilder isbnSchema;
@Override
public void configure(Properties props) {
isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name"));
}
@Override
public void converterFor(RelationalColumn column,
ConverterRegistration<SchemaBuilder> registration) {
if ("isbn".equals(column.typeName())) {
registration.register(isbnSchema, x -> x.toString());
}
}
}
Debezium and Kafka Connect API module dependencies
A custom converter Java project has compile dependencies on the Debezium API and Kafka Connect API library modules.
These compile dependencies must be included in your project’s pom.xml
, as shown in the following example:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version> (1)
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${version.kafka}</version> (2)
</dependency>
1 | ${version.debezium} represents the version of the Debezium connector. |
2 | ${version.kafka} represents the version of Apache Kafka in your environment. |
Configuring and Using Converters
Custom converters act on specific columns or column types in a source table to specify how to convert the data types in the source to Kafka Connect schema types. To use a custom converter with a connector, you deploy the converter JAR file alongside the connector file, and then configure the connector to use the converter.
Deploying a custom converter
-
You have a custom converter Java program.
-
To use a custom converter with a Debezium connector, export the Java project to a JAR file, and copy the file to the directory that contains the JAR file for each Debezium connector that you want to use it with.
For example, in a typical deployment, the Debezium connector files are stored in subdirectories of a Kafka Connect directory (/kafka/connect
), with each connector JAR in its own subdirectory (/kafka/connect/debezium-connector-db2
,/kafka/connect/debezium-connector-mysql
, and so forth). To use a converter with a connector, add the converter JAR file to the connector’s subdirectory.
To use a converter with multiple connectors, you must place a copy of the converter JAR file in each connector subdirectory. |
Configuring a connector to use a custom converter
To enable a connector to use the custom converter, you add properties to the connector configuration that specify the converter name and class. If the converter requires further information to customize the formats of specific data types, you can also define other coniguration options to provide that information.
-
Enable a converter for a connector instance by adding the following mandatory properties to the connector configuration:
converters: <converterSymbolicName> (1) <converterSymbolicName>.type: <fullyQualifiedConverterClassName> (2)
1 The converters
property is mandatory and enumerates a comma-separated list of symbolic names of the converter instances to use with the connector. The values listed for this property serve as prefixes in the names of other properties that you specify for the converter.2 The <converterSymbolicName>.type
property is mandatory, and specifies the name of the class that implements the converter. For example, for the earlier custom converter example, you would add the following properties to the connector configuration:converters: isbn isbn.type: io.debezium.test.IsbnConverter
-
To associate other properties with a custom converter, prefix the property names with the symbolic name of the converter, followed by a dot (
.
). The symbolic name is a label that you specify as a value for theconverters
property. For example, to add a property for the precedingisbn
converter to specify theschema.name
to pass to theconfigure
method in the converter code, add the following property:isbn.schema.name: io.debezium.postgresql.type.Isbn