ConvertCloudEventToSaveableForm
The ConvertCloudEventToSaveableForm
SMT is used in conjunction with CloudEventsConverter
and JdbcSinkConnector
that is it makes a
CloudEvent suitable for saving to a database.
It is done using the mapping between a CloudEvent and target database table columns; it specifies which CloudEvent field is
associated with which column. A new structure and its schema created by the transform contain fields with the names of target table
columns and values from an original CloudEvent. If a CloudEvent contains data
field as a structure, it is flattened, that is,
converted to JSON string.
To perform the conversion, SMT needs to know serializer type that is used to serialize and deserialize CloudEvents
(JSON or Avro). serializer.type
should have the same value as value.converter.serializer.type
.
Example
To convert a CloudEvent to a form suitable for JdbcSinkConnector
, configure the ConvertCloudEventToSaveableForm
SMT in the Kafka Connect configuration
for a connector. For example, to convert CloudEvents that were serialized and deserialized with JSON provided that the target database has
id
, source
, type
, and payload
columns, use the following configuration for the SMT:
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
...
"transforms": "convertCloudEvent",
"transforms.convertCloudEvent.type": "io.debezium.connector.jdbc.transforms.ConvertCloudEventToSaveableForm",
"transforms.convertCloudEvent.fields.mapping": "id,source,type,data:payload",
"transforms.convertCloudEvent.serializer.type": "json",
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type": "json",
"value.converter.data.serializer.type": "json"
...
The following example shows the value of a record before and after the transformation is applied.
ConvertCloudEventToSaveableForm
SMT- Value before the SMT processes the record
-
{ "id": "624e6565-99ee-4fdb-9228-653138c3a7b3", "source": "/debezium/postgresql/book", "specversion": "1.0", "type": "BookCreated", "time": "2023-11-11T07:11:01.825Z", "datacontenttype": "application/json", "data": { "id": 4, "name": "1984", "publicationYear": 1949 } }
- Value after the SMT processes the record
-
{ "id": "624e6565-99ee-4fdb-9228-653138c3a7b3", "source": "/debezium/postgresql/book", "type": "BookCreated", "payload": "{"id": 4, "name": "1984", "publicationYear": 1949}" }
Configuration options
The following table lists the configuration options that you can use with the ConvertCloudEventToSaveableForm
SMT.
Property |
Description |
Type |
Default |
Valid Values |
Importance |
A comma-separated list of pairs each of which contains the name of a CloudEvents field followed by the name of a target table column |
list |
No default value |
non-empty list |
high |
|
Serializer type that is used to serialize and deserialize CloudEvents. Should have the same value as |
string |
No default value |
|
high |
|
CloudEvents schema name under which the schema is registered in a Schema Registry. |
string |
No default value |
low |