Debezium Engine
Debezium connectors are normally operated by deploying them to a Kafka Connect service, and configuring one or more connectors to monitor upstream databases and produce data change events for all changes that they see in the upstream databases. Those data change events are written to Kafka, where they can be independently consumed by many different applications. Kafka Connect provides excellent fault tolerance and scalability, since it runs as a distributed service and ensures that all registered and configured connectors are always running. For example, even if one of the Kafka Connect endpoints in a cluster goes down, the remaining Kafka Connect endpoints will restart any connectors that were previously running on the now-terminated endpoint, minimizing downtime and eliminating administrative activities.
Not every application needs this level of fault tolerance and reliability, and they may not want to rely upon an external cluster of Kafka brokers and Kafka Connect services. Instead, some applications would prefer to embed Debezium connectors directly within the application space. They still want the same data change events, but prefer to have the connectors send them directly to the application rather than persist them inside Kafka.
This debezium-api module defines a small API that allows an application to easily configure and run Debezium connectors using Debezium Engine.
Beginning with the 2.6.0 release, Debezium provides two implementations of the DebeziumEngine interface.
The older EmbeddedEngine implementation runs a single connector that uses only one task.
The connector emits all records sequentially.
EmbeddedEngine is the default implementation in Debezium release 3.1.0.Final and older.
Starting Debezium release 3.2.0.Alpha1, the default implementation is AsyncEmbeddedEngine and EmbeddedEngine implementation is not available anymore.
Beginning with the 2.6.0 release, a new AsyncEmbeddedEngine implementation is available.
This implementation also runs only a single connector, but it can process records in multiple threads, and run multiple tasks, if the connector supports it (currently only the connectors for SQL Server and MongoDB support running multiple tasks within a single connector).
Because both of these engines implement the same interface and share the same API, the code examples that follow are valid for either engine.
Both implementations support the same configuration options.
However, the new AsyncEmbeddedEngine provides a couple of new configuration options for setting up and fine-tuning parallel processing.
For information about these new configuration options, see the Asynchronous Engine Properties.
To learn more about the motivation behind development of the AsyncEmbeddedEngine and about its implementation details, see the Asynchronous Embedded Engine design document.
Dependencies
To use Debezium Engine module, add the debezium-api module to your application’s dependencies.
There is one out-of-the-box implementation of this API in debezium-embedded module which should be added to the dependencies too.
For Maven, this entails adding the following to your application’s POM:
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>where ${version.debezium} is either the version of Debezium you’re using or a Maven property whose value contains the Debezium version string.
Likewise, add dependencies for each of the Debezium connectors that your application will use. For example, the following can be added to your application’s Maven POM file so your application can use the MySQL connector:
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${version.debezium}</version>
</dependency>Or for the MongoDB connector:
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mongodb</artifactId>
    <version>${version.debezium}</version>
</dependency>The remainder of this document describes embedding the MySQL connector in your application. Other connectors are used in a similar manner, except with connector-specific configuration, topics, and events.
Packaging your project
Debezium uses the SPI via the ServiceLoader to load the implementation. The implementation can be based on the connector type, or it can be a custom implementation.
Some interfaces have multiple implementations.
For example, io.debezium.snapshot.spi.SnapshotLock has a default implementation in the core, and specific implementations for each connector.
To ensure that Debezium can locate the required implementation, you must explicitly configure your build tools to merge the META-INF/services files.
For example, if you are using the Maven shade plugin,
add the ServicesResourceTransformer transformer, as shown in the following example:
...
<configuration>
 <transformers>
    ...
    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
    ...
 </transformers>
...
</configuration>Alternatively, if you use the Maven Assembly plug-in, you can use the metaInf-services container descriptor handlers.
In the Code
Your application needs to set up an embedded engine for each connector instance you want to run.
The io.debezium.engine.DebeziumEngine<R> class serves as an easy-to-use wrapper around any Debezium connector and completely manages the connector’s lifecycle.
You create the DebeziumEngine instance using its builder API,
providing the following things:
- 
The format in which you want to receive the message, e.g. JSON, Avro or as Kafka Connect SourceRecord(see output message formats)
- 
Configuration properties (perhaps loaded from a properties file) that define the environment for both the engine and the connector 
- 
A method that will be called for every data change event produced by the connector 
Here’s an example of code that configures and runs an embedded MySQL connector:
// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");
/* begin connector properties */
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "mysqluser");
props.setProperty("database.password", "mysqlpw");
props.setProperty("database.server.id", "85744");
props.setProperty("topic.prefix", "my-app-connector");
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat");
// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        }).build()
    ) {
    // Run the engine asynchronously ...
    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.execute(engine);
    // Do something else or wait for a signal or an event
}
// Engine is stopped when the main code is finishedLet’s look into this code in more detail, starting with the first few lines that we repeat here:
// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");This creates a new standard Properties object to set several fields required by the engine regardless of which connector is being used.
The first is a name for the engine that will be used within the source records produced by the connector and its internal state, so use something meaningful in your application.
The connector.class field defines the name of the class that extends the Kafka Connect org.apache.kafka.connect.source.SourceConnector abstract class; in this example, we specify Debezium’s MySqlConnector class.
When a Kafka Connect connector runs, it reads information from the source and periodically records "offsets" that define how much of that information it has processed. Should the connector be restarted, it will use the last recorded offset to know where in the source information it should resume reading.
Since connectors don’t know or care how the offsets are stored, it is up to the engine to provide a way to store and recover these offsets.
The next few fields of our configuration specify that our engine should use the FileOffsetBackingStore class to store offsets in the /path/to/storage/offset.dat file on the local file system (the file can be named anything and stored anywhere).
Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in our case, once each minute).
These fields can be tailored as needed for your application.
The next few lines define the fields that are specific to the connector (documented in per-connector docs), which in our example is the MySqlConnector connector:
    /* begin connector properties */
    props.setProperty("database.hostname", "localhost");
    props.setProperty("database.port", "3306");
    props.setProperty("database.user", "mysqluser");
    props.setProperty("database.password", "mysqlpw");
    props.setProperty("database.server.id", "85744");
    props.setProperty("topic.prefix", "my-app-connector");
    props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
    props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat");Here, we set the name of the host machine and port number where the MySQL database server is running, and we define the username and password that will be used to connect to the MySQL database. Note that for MySQL the username and password should correspond to a MySQL database user that has been granted the following MySQL permissions:
- 
SELECT
- 
RELOAD
- 
SHOW DATABASES
- 
REPLICATION SLAVE
- 
REPLICATION CLIENT
The first three privileges are required when reading a consistent snapshot of the databases. The last two privileges allow the database to read the server’s binlog that is normally used for MySQL replication.
The configuration also includes a numeric identifier for the server.id.
Since MySQL’s binlog is part of the MySQL replication mechanism, in order to read the binlog the MySqlConnector instance must join the MySQL server group, and that means this server ID must be unique within all processes that make up the MySQL server group and is any integer between 1 and 232-1.
In our code we set it to a fairly large but somewhat random value we’ll use only for our application.
The configuration also specifies a logical name for the MySQL server. The connector includes this logical name within the topic field of every source record it produces, enabling your application to discern the origin of those records. Our example uses a server name of "products", presumably because the database contains product information. Of course, you can name this anything meaningful to your application.
When the MySqlConnector class runs, it reads the MySQL server’s binlog, which includes all data changes and schema changes made to the databases hosted by the server.
Since all changes to data are structured in terms of the owning table’s schema at the time the change was recorded, the connector needs to track all of the schema changes so that it can properly decode the change events.
The connector records the schema information so that, should the connector be restarted and resume reading from the last recorded offset, it knows exactly what the database schemas looked like at that offset.
How the connector records the database schema history is defined in the last two fields of our configuration, namely that our connector should use the FileSchemaHistory class to store database schema history changes in the /path/to/storage/schemahistory.dat file on the local file system (again, this file can be named anything and stored anywhere).
Finally the immutable configuration is built using the build() method.
(Incidentally, rather than build it programmatically, we could have read the configuration from a properties file using one of the Configuration.read(…) methods.)
Now that we have a configuration, we can create our engine. Here again are the relevant lines of code:
// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        })
        .build()) {
}All change events will be passed to the given handler method, which must match the signature of the java.util.function.Consumer<R> functional interface, where <R> must match the type of the format specified when calling create().
Note that your application’s handler function should not throw any exceptions; if it does, the engine will log any exception thrown by the method and will continue to operate on the next source record, but your application will not have another chance to handle the particular source record that caused the exception, meaning your application might become inconsistent with the database.
At this point, we have an existing DebeziumEngine object that is configured and ready to run, but it doesn’t do anything.
The DebeziumEngine is designed to be executed asynchronously by an Executor or ExecutorService:
// Run the engine asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
// Do something else or wait for a signal or an eventYour application can stop the engine safely and gracefully by calling its close() method:
// At some later time ...
engine.close();or as the engine supports the Closeable interface it would be called automatically when the try block is left.
The engine’s connector will stop reading information from the source system, forward all remaining change events to your handler function, and flush the latest offets to offset storage.
Only after all of this completes will the engine’s run() method return.
If your application needs to wait for the engine to completely stop before exiting, you can do this with the ExcecutorService shutdown and awaitTermination methods:
try {
    executor.shutdown();
    while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
        logger.info("Waiting another 5 seconds for the embedded engine to shut down");
    }
}
catch ( InterruptedException e ) {
    Thread.currentThread().interrupt();
}Alternatively you can register CompletionCallback while creating DebeziumEngine as a callback to be informed when the engine terminates.
Recall that when the JVM shuts down, it only waits for non-daemon threads. Therefore, when you run the engine on a daemon thread, if your application exits, be sure to wait for the engine process to complete.
Your application should always properly stop the engine to ensure graceful and complete shutdown and that each source record is sent to the application exactly one time.
For example, do not rely upon shutting down the ExecutorService, since that interrupts the running threads.
Although the DebeziumEngine will indeed terminate when its thread is interrupted, the engine may not terminate cleanly, and when your application is restarted it may see some of the same source records that it had processed just prior to the shutdown.
As mentioned earlier, there are two implementations of the DebeziumEngine interface.
The two implementations use the same API, and the preceding code sample is valid for both versions.
The only exception is the creation of the DebeziumEngine instance.
As was also mentioned in the introduction, by default, the AsyncEmbeddedEngine implementation is used.
Therefore, the method DebeziumEngine.create(Json.class) results internally in the use of the AsyncEmbeddedEngine instance.
Output Message Formats
DebeziumEngine#create() can accept multiple different parameters that affect the format in which the messages are received by the consumer.
Allowed values are:
- 
Connect.class- the output value is change event wrapping Kafka Connect’sSourceRecord
- 
Json.class- the output value is a pair of key and value encoded asJSONstrings
- 
JsonByteArray.class- the output value is a pair of key and value formatted asJSONand encoded as UTF-8 byte arrays
- 
Avro.class- the output value is a pair of key and value encoded as Avro serialized records (see Avro Serialization for more details)
- 
CloudEvents.class- the output value is a pair of key and value encoded as Cloud Events messages
The header format can also be specified when calling DebeziumEngine#create().
Allowed values are:
- 
Json.class- the header values are encoded asJSONstrings
- 
JsonByteArray.class- the header values are formatted asJSONand encoded as UTF-8 byte arrays
Internally, the engine delegates data conversion to the Kafka Connect or Apicurio converter implementation with the algorithm that is best suited to perform the conversion. The converter can be parametrized using engine properties to modify its behaviour.
An example of JSON output format is
final Properties props = new Properties();
...
props.setProperty("converter.schemas.enable", "false"); // don't include schema in message
...
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
    .using(props)
    .notifying((records, committer) -> {
        for (ChangeEvent<String, String> r : records) {
            System.out.println("Key = '" + r.key() + "' value = '" + r.value() + "'");
            committer.markProcessed(r);
        }
...Where the ChangeEvent datatype is the key/value pair.
Message transformations
Before the messages are delivered to the handler it is possible to run them through a pipeline of Kafka Connect  Simple Message Transforms (SMT).
Each SMT can pass the message unchanged, modify it or filter it out.
The chain is configured using property transforms.
The property contains a comma-separated list of logical names of the transformations to be applied.
Properties transforms.<logical_name>.type then defines the name of the implementation class for each transformation and transforms.<logical_name>.* configuration options that are passed to the transformation.
An example of the configuration is
final Properties props = new Properties();
...
props.setProperty("transforms", "filter, router");                                               // (1)
props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter");  // (2)
props.setProperty("transforms.router.regex", "(.*)");                                            // (3)
props.setProperty("transforms.router.replacement", "trf$1");                                     // (3)
props.setProperty("transforms.filter.type", "io.debezium.embedded.ExampleFilterTransform");      // (4)- 
Two transformations are defined - filterandrouter
- 
Implementation of the routertransformation isorg.apache.kafka.connect.transforms.RegexRouter
- 
The routertransformation has two configurations options -regexandreplacement
- 
Implementation of the filtertransformation isio.debezium.embedded.ExampleFilterTransform
Message Transformation Predicates
Predicates can be applied to transformations to make the transformations optional.
An example of the configuration is
final Properties props = new Properties();
...
props.setProperty("transforms", "filter");                                                 // (1)
props.setProperty("predicates", "headerExists");                                           // (2)
props.setProperty("predicates.headerExists.type", "org.apache.kafka.connect.transforms.predicates.HasHeaderKey"); //(3)
props.setProperty("predicates.headerExists.name", "header.name");                          // (4)
props.setProperty("transforms.filter.type", "io.debezium.embedded.ExampleFilterTransform");// (5)
props.setProperty("transforms.filter.predicate", "headerExists");                          // (6)
props.setProperty("transforms.filter.negate", "true");                                     // (7)- 
One transformation is defined - filter
- 
One predicate is defined - headerExists
- 
Implementation of the headerExistspredicate isorg.apache.kafka.connect.transforms.predicates.HasHeaderKey
- 
The headerExistspredicate has one configuration option -name
- 
Implementation of the filtertransformation isio.debezium.embedded.ExampleFilterTransform
- 
The filtertransformation requires the predicateheaderExists
- 
The filtertransformation expects the value of the predicate to be negated, making the predicate determine if the header does not exist
Advanced Record Consuming
For some use cases, such as when trying to write records in batches or against an async API, the functional interface described above may be challenging.
In these situations, it may be easier to use the io.debezium.engine.DebeziumEngine.ChangeConsumer<R>. interface.
This interface has single function with the following signature:
/**
  * Handles a batch of records, calling the {@link RecordCommitter#markProcessed(Object)}
  * for each record and {@link RecordCommitter#markBatchFinished()} when this batch is finished.
  * @param records the records to be processed
  * @param committer the committer that indicates to the system that we are finished
  */
 void handleBatch(List<R> records, RecordCommitter<R> committer) throws InterruptedException;As mentioned in the Javadoc, the RecordCommitter object is to be called for each record and once each batch is finished.
The RecordCommitter interface is threadsafe, which allows for flexible processing of records.
You can optionally overwrite the offsets of the records that are processed. This is done by first building a new
Offsets object by calling RecordCommitter#buildOffsets(), updating the offsets with Offsets#set(String key, Object value),
and then calling RecordCommitter#markProcessed(SourceRecord record, Offsets sourceOffsets),
with the updated Offsets.
To use the ChangeConsumer API, you must pass an implementation of the interface to the notifying API, as seen below:
class MyChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {
  public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
    ...
  }
}
// Create the engine with this configuration ...
DebeziumEngine<RecordChangeEvent<SourceRecord>> engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(props)
        .notifying(new MyChangeConsumer())
        .build();If JSON format is used (an equivalent would work for other formats too) then the code would look like:
class JsonChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {
  public void handleBatch(List<ChangeEvent<String, String>> records,
    RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException {
    ...
  }
}
// Create the engine with this configuration ...
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(new JsonChangeConsumer())
        .build();Engine Properties
The following configuration properties are required unless a default value is available (for the sake of text formatting the package names of Java classes are replaced with <…>).
| Property | Default | Description | 
| 
 | Unique name for the connector instance. | |
| 
 | The name of the Java class for the connector, e.g   | |
| 
 | 
 | The name of the Java class that is responsible for persistence of connector offsets.
It must implement  | 
| 
 | 
 | Path to file where offsets are to be stored.
Required when  | 
| 
 | 
 | The name of the Kafka topic where offsets are to be stored.
Required when  | 
| 
 | 
 | The number of partitions used when creating the offset storage topic.
Required when  | 
| 
 | 
 | Replication factor used when creating the offset storage topic.
Required when  | 
| 
 | 
 | The name of the Java class of the commit policy.
It defines when offsets commit has to be triggered based on the number of events processed and the time elapsed since the last commit. This class must implement the interface  | 
| 
 | 
 | Interval at which to try committing offsets. The default is 1 minute. | 
| 
 | 
 | Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt. The default is 5 seconds. | 
| 
 | 
 | The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries). | 
| 
 | 
 | Initial delay (in ms) for retries when encountering connection errors. This value will be doubled upon every retry but won’t exceed  | 
| 
 | 
 | Max delay (in ms) between retries when encountering connection errors. | 
Asynchronous Engine Properties
| Property | Default | Description | 
| 
 | Threads allocated on demand, based on the workload and the number of available CPU cores. | The number of threads that are available to process change event records.
If no value is specified (the default), the engine uses the Java ThreadPoolExecutor to dynamically adjust the number of threads, based on the current workload.
Maximum number of threads is number of CPU cores on given machine.
If a value is specified, the engine uses the Java fixed thread pool method to create a thread pool with the specified number of threads.
To use all available cores on given machine, set the placeholder value,  | 
| 
 | 1000 | Maximum time in milliseconds to wait for processing submitted records after a task shutdown is called. | 
| 
 | 
 | Determines how the records should be produced. 
 The non-sequential processing of the  | 
| 
 | 
 | Specifies whether the default  | 
| 
 | 180,000 (3 min) | Time, in milliseconds, that the engine waits for a task’s lifecycle management operations (starting and stopping) to complete. | 
Database schema history properties
Some of the connectors also requires additional set of properties that configures database schema history:
- 
MySQL 
- 
SQL Server 
- 
Oracle 
- 
Db2 
Without proper configuration of the database schema history the connectors will refuse to start. The default configuration expects a Kafka cluster to be available. For other deployments, a file-based database schema history storage implementation is available.
| Property | Default | Description | 
|---|---|---|
| 
 | 
 | The name of the Java class that is responsible for persistence of the database schema history. | 
| 
 | 
 | Path to a file where the database schema history is stored. | 
| 
 | 
 | The Kafka topic where the database schema history is stored. | 
| 
 | 
 | The initial list of Kafka cluster servers to connect to.
The cluster provides the topic to store the database schema history. | 
Handling Failures
When the engine executes, its connector is actively recording the source offset inside each source record, and the engine is periodically flushing those offsets to persistent storage. When the application and engine shutdown normally or crash, when they are restarted the engine and its connector will resume reading the source information from the last recorded offset.
So, what happens when your application fails while an embedded engine is running?
The net effect is that the application will likely receive some source records after restart that it had already processed right before the crash.
How many depends upon how frequently the engine flushes offsets to its store (via the offset.flush.interval.ms property) and how many source records the specific connector returns in one batch.
The best case is that the offsets are flushed every time (e.g., offset.flush.interval.ms is set to 0), but even then the embedded engine will still only flush the offsets after each batch of source records is received from the connector.
For example, the MySQL connector uses the max.batch.size to specify the maximum number of source records that can appear in a batch.
Even with offset.flush.interval.ms is set to 0, when an application restarts after a crash it may see up to n duplicates, where n is the size of the batches.
If the offset.flush.interval.ms property is set higher, then the application may see up to n * m duplicates, where n is the maximum size of the batches and m is the number of batches that might accumulate during a single offset flush interval.
(Obviously it is possible to configure embedded connectors to use no batching and to always flush offsets, resulting in an application never receiving any duplicate source records.
However, this dramatically increases the overhead and decreases the throughput of the connectors.)
The bottom line is that when using embedded connectors, applications will receive each source record exactly once during normal operation (including restart after a graceful shutdown), but do need to be tolerant of receiving duplicate events immediately following a restart after a crash or improper shutdown. If applications need more rigorous exactly-once behavior, then they should use the full Debezium platform that can provide exactly-once guarantees (even after crashes and restarts).