One of the typical Debezium uses cases is to use change data capture to integrate a legacy system with other systems in the organization. There are multiple ways how to achieve this goal

  • Write data to Kafka using Debezium and follow with a combination of Kafka Streams pipelines and Kafka Connect connectors to deliver the changes to other systems

  • Use Debezium Embedded engine in a Java standalone application and write the integration code using plain Java; that’s often used to send change events to alternative messaging infrastructure such as Amazon Kinesis, Google Pub/Sub etc.

  • Use an existing integration framework or service bus to express the pipeline logic

This article is focusing on the third option - a dedicated integration framework.

Apache Camel

Camel is an open-source integration framework that enables developers to read, transform, route and write data from and to disparate systems and services. It provides a large amount of ready-made components that either provide interfaces to 3rd party systems or offers an implementation of enterprise integration patterns.

This combination allows the developer to easily connect to target systems and express integration pipelines using a declarative DSL.

Camel and Debezium

Camel 3 has been released by the end of the year 2019 and in addition to the major re-architecture new Debezium components have been added to the codebase. It also enables Camel to be used as a connector in Kafka Connect runtime.

This post focuses solely on Debezium components use and the latter option will be covered in a future post.

As you can see every non-incubating Debezium connector is represented by its dedicated component. The advantage of this solution is the complete isolation of dependencies and the type-safe configuration of connector instances.

Internally the component exposes a Debezium endpoint with an event-driven Camel consumer that encapsulates an instance of the Debezium embedded engine.

An Example

As an example we’ve built a simple Question & Answer (Q & A) application, loosely inspired by StackOverflow and the likes. A REST API allows to post new questions as well as answers to existing questions, which are stored in a database.

Any data changes generated by the application (e.g. if a new question or answer got created) are captured via Debezium and passed to the Camel pipeline, which sends emails via an SMTP server and posts a corresponding tweet on a provided Twitter account.

You can find the complete source code of the example on GitHub.

Topology

There are multiple components in the solution topology:

Figure 1. The Deployment Topology

  • The Q & A application is implemented using the Quarkus stack and exposes a REST API to create questions and answers

  • The application stores its data in the PostgreSQL database

  • The Camel route runs as a plain Java application that uses an embedded Infinispan store to persist its state (used to build an aggregate object linking questions to their answers) and sends messages to about answered questions via e-mail and to an associated Twitter account

  • A MailHog SMTP server running in a container for sending e-mails

Question & Answer Application

The source application is a simple REST service based on Quarkus. It manages two entities, Question and Answer, with a 1:n relation stored in the PostgreSQL database.

Figure 2. The Q&A Backend Service Entity Relationship Diagram

The entities are created using REST API and the association is automatically established between them.

The Camel Pipeline

The Camel pipeline is an expression of the following business rules:

  • For every question created or updated send an email to the question creator

  • For every answer created or updated send an email to both question and answer creator

  • When a question achieves three answers post a tweet on a dedicated Twitter account about it

The business requirements are transformed into a pipeline described by this EIP chart:

Figure 4. The Camel Pipeline

Code Walkthrough

To use Debezium Camel component we need to add at least following dependencies into a pom.xml file

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-bom</artifactId>
      <version>${version.camel}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>

    <!-- Use required Debezium version -->
    <dependency>
      <groupId>io.debezium</groupId>
      <artifactId>debezium-connector-postgres</artifactId>
      <version>${version.debezium}</version>
    </dependency>
    <dependency>
      <groupId>io.debezium</groupId>
      <artifactId>debezium-embedded</artifactId>
      <version>${version.debezium}</version>
    </dependency>
    <dependency>
      <groupId>io.debezium</groupId>
      <artifactId>debezium-core</artifactId>
      <version>${version.debezium}</version>
    </dependency>

  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-core</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-debezium-postgres</artifactId>
  </dependency>
</dependencies>

The pipeline logic itself is defined in QaDatabaseUserNotifier class. Its main route looks like tis:

public class QaDatabaseUserNotifier extends RouteBuilder {

  @Override
  public void configure() throws Exception {
    from("debezium-postgres:localhost?"
        + "databaseHostname={{database.hostname}}"
        + "&databasePort={{database.port}}"
        + "&databaseUser={{database.user}}"
        + "&databasePassword={{database.password}}"
        + "&databaseDbname=postgres"
        + "&databaseServerName=qa"
        + "&schemaWhitelist={{database.schema}}"
        + "&tableWhitelist={{database.schema}}.question,{{database.schema}}.answer"
        + "&offsetStorage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore")
        .routeId(QaDatabaseUserNotifier.class.getName() + ".DatabaseReader")     (1)
        .log(LoggingLevel.DEBUG, "Incoming message ${body} with headers ${headers}")
        .choice()                                                                (2)
          .when(isQuestionEvent)
            .filter(isCreateOrUpdateEvent)                                       (3)
              .convertBodyTo(Question.class)                                     (4)
              .log(LoggingLevel.TRACE, "Converted to logical class ${body}")
              .bean(store, "readFromStoreAndUpdateIfNeeded")                     (5)
              .to(ROUTE_MAIL_QUESTION_CREATE)                                    (6)
            .endChoice()
          .when(isAnswerEvent)
            .filter(isCreateOrUpdateEvent)
              .convertBodyTo(Answer.class)
              .log(LoggingLevel.TRACE, "Converted to logical class ${body}")
              .bean(store, "readFromStoreAndAddAnswer")
              .to(ROUTE_MAIL_ANSWER_CHANGE)
              .filter(hasManyAnswers)                                            (7)
                .setBody().simple("Question '${exchangeProperty[aggregate].text}' has " +
                    "many answers (generated at " + Instant.now() + ")")
                .to(TWITTER_SERVER)
              .end()
            .endChoice()
          .otherwise()
            .log(LoggingLevel.WARN, "Unknown type ${headers[" +
                DebeziumConstants.HEADER_IDENTIFIER + "]}")
        .endParent();

    from(ROUTE_MAIL_QUESTION_CREATE)                                             (6)
      .routeId(QaDatabaseUserNotifier.class.getName() + ".QuestionNotifier")
      .setHeader("To").simple("${body.email}")
      .setHeader("Subject").simple("Question created/edited")
      .setBody().simple("Question '${body.text}' was created or edited")
      .to(SMTP_SERVER);
  }

  @Converter
  public static class Converters {

    @Converter
    public static Question questionFromStruct(Struct struct) {                   (4)
      return new Question(struct.getInt64("id"), struct.getString("text"),
          struct.getString("email"));
    }

    @Converter
    public static Answer answerFromStruct(Struct struct) {                       (4)
      return new Answer(struct.getInt64("id"), struct.getString("text"),
          struct.getString("email"), struct.getInt64("question_id"));
    }
  }
}
1 from is the Debezium source endpoint. The URI parts map directly to connector configuration options.
2 The pipeline logic is split depending on the change event type. The recognition is based on CamelDebeziumIdentifier header which contains the identifier (<server_name>.<schema_name>.<table_name>) of the source table.
3 The pipeline is now able to process only updates and deletes. The recognition is based on CamelDebeziumOperation header that contains op field of the message Envelope.
4 The Kafka Connect’s Struct type is converted into a logical type used in the pipeline. The conversion is performed by a custom Camel converter. It is possible to use out-of-the-box DebeziumTypeConverter that converts Struct into a Map but this tightly couples pipeline logic into the table structure.
5 A supplementary route is invoked that communicates with a message store based on an Infinispan cache to build a message aggregate. The message store checks if it has the question already stored. If not a new aggregate is created and stored otherwise the stored aggregate is updated with new data.
6 A supplementary route is invoked that formats a mail message and delivers it to the question creator via the SMTP endpoint.
7 The route part related to the answer message type is very similar (answers are added to question aggregate). The main difference is the posting of a Twitter message when the aggregate contains three answers.

On a side note, for the sake of simplicitiy, the example currently uses volatile memory to store the Debezium offsets. For persistent storage you could either use a file-based offset store or create a custom offset store implementation based on Infinispan, delegating the storage of offsets to the underlying cache.

Demo

In order to run the demo, you need to have a Twitter developer account with appropriate API keys and secrets.

Go to the application directory and build all components:

$ mvn clean install

Start the services (provide your own Twitter API credentials):

$ env TWITTER_CONSUMER_KEY=<...> TWITTER_CONSUMER_SECRET=<...> TWITTER_ACCESS_TOKEN=<...> TWITTER_ACCESS_TOKEN_SECRET=<...> docker-compose up

In another terminal create a question and three answers to it:

$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/ -d @src/test/resources/messages/create-question.json
$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/1/answer -d @src/test/resources/messages/create-answer1.json
$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/1/answer -d @src/test/resources/messages/create-answer2.json
$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/1/answer -d @src/test/resources/messages/create-answer3.json

The Twitter account should contain a new tweet with a text like "Question 'How many legs does a dog have?' has many answers (generated at 2020-02-17T08:02:33.744Z)". Also the MailHog server UI should display messages like these:

Figure 4. The MailHog Messages

Conclusion

Apache Camel is a very interesting option for implementing system integration scenarios.

Without the need for any external messaging infrastructure, it is very easy to deploy a standalone Camel route with the Debezium component, enabling the capture of data changes and execution of complex routing and transformation operations on them. Camel equips the developer with a full arsenal of enterprise integration pattern implementations, as well as more than hundred connectors for different systems that could be included in a complex service orchestration.

The source code of the full example is available on GitHub.

About Debezium

Debezium is an open-source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open-source on GitHub, so build the code locally and help us improve our existing connectors and add even more connectors. If you find problems or have an idea on how we can improve Debezium, please let us know or log an issue.

Jiri Pechanec

Jiri is a software developer (and a former quality engineer) at Red Hat. He spent most of his career with Java and system integration projects and tasks. He lives near Brno, Czech Republic.

   

back to top