 
 Welcome to this series of articles dedicated to signaling and notifications in Debezium! This post serves as the second installment in the series, where we will discuss how to customize the signal and notification channels in Debezium.
Debezium 2.3 introduced new improvements in signaling and notification capabilities. You can set up new signals and notification channels in addition to the pre-defined signals and notification channels offered by Debezium. This feature enables users to customize the system to suit their unique needs and combine it with their existing infrastructure or third-party solutions. It enables effective monitoring and a proactive response to data changes by precisely capturing and communicating signal events and triggering notifications through preferred channels.
The first article in this series, Signaling and Notifications in Debezium, provides an overview of the signaling and notification features in Debezium. It also discusses the available channels & their use cases for various scenarios.
Custom Signal & Notification Channel
In Debezium, it is possible to customize the signal and notification channels to fit specific requirements. For example, we can achieve a customization by creating an HTTP channel for both signals and notifications. This HTTP channel receives signals from an http endpoint, and notifications can be sent back to the endpoint upon the delivery of the signal.
Let’s explore an example that demonstrates how to create and utilize the HTTP signal and notification channels using the Debezium Postgres connector, a Mock Server to send signals, and Postbin to receive notifications via http endpoint.
Setting up the HTTP Signal Channel:
 -  Configure the Debezium Postgres connector to receive signals whenever a relevant database change occurs. 
-  Set up a service to send signals to Debezium using the HTTPchannel. The service can be a database, a third-party application, or any other system that can send http requests. In this example, we will use the Mock Server to send signals to Debezium. The Mock Server is a service that can be used to mock http requests and responses.
-  Configure the Mock Server to send the signals via a http endpoint using the appropriate HTTP method (e.g., POST). 
-  Customize the HTTPchannel settings to define the http endpoint URL, authentication, headers, and any additional parameters as needed.
Setting up the HTTP Notification Channel:
 -  Once the signal is received and processed by Debezium, it can trigger the posting of a notification to a http endpoint. In this example, we will send the notification to the Postbin bin using the HTTPchannel. Postbin is a service that can be used to receive http requests and view the request details.
-  Customize the HTTPchannel settings for notifications, create a bin in Postbin, and define the http endpoint URL, authentication, headers, and any additional parameters as needed.
-  Forward the notification event to the http endpoint i.e Postbin bin using the appropriate HTTP method (e.g., POST). The notification payload can be customized as needed. 
The complete source code for this example in the blog post is provided in the Debezium examples repository under http-signal-notification directory.
Create a java project to build the HTTP signal and notification channels. Run the following command to create a new java project using Maven:
mvn archetype:generate
    -DgroupId=io.debezium.examples
    -DartifactId=http-signaling-notificationAdd the following dependency to the pom.xml file with the Debezium version (2.3 and later versions):
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>2.3.0.Final</version>
</dependency>To receive signals using a Mock Server, create a Docker Compose file that defines the Mock Server service. The configuration for the Mock Server service will be as follows:
services:
  mockServer:
    image: mockserver/mockserver:latest
    ports:
      - 1080:1080
    environment:
      - MOCKSERVER_WATCH_INITIALIZATION_JSON=true
      - MOCKSERVER_INITIALIZATION_JSON_PATH=/config/initializerJson.json
    volumes:
        - ./initializerJson.json:/config/initializerJson.jsonThe environment variables MOCKSERVER_WATCH_INITIALIZATION_JSON and MOCKSERVER_INITIALIZATION_JSON_PATH are set to enable the Mock Server to watch for changes in the initialization JSON file and specify its path. The initializerJson.json file, which contains the http request and response information for the signals, is mounted into the Mock Server container.
The initializerJson.json file defines a mock http request to the path /api/signal with the query string parameter code=10969. When this request is received by the Mock Server, it will respond with a JSON body containing an id, type, and data. The response has a status code of 200, indicating a successful response. The definition of the initializerJson.json file is as follows:
[
  {
    "httpRequest" : {
      "method" : "GET",
      "path" : "/api/signal",
      "queryStringParameters" : {
        "code" : ["10969"]
      }
    },
    "httpResponse" : {
      "body": "{\"id\":\"924e3ff8-2245-43ca-ba77-2af9af02fa07\",\"type\":\"log\",\"data\":{\"message\": \"Signal message received from http endpoint.\"}}",
      "statusCode": 200
    }
  }
]-  id: an arbitrary unique string that identifies a signal instance.
-  type: the type of signal to send. In this example, the type islog, and it requests the connector to add an entry to the connector’s log file. After the signal is processed, the connector prints the specified message in the log.
-  data: the JSON-formatted parameters to pass to a signal event. In this example, themessageparameter is passed to the signal event.
Create the HTTP signal channel by implementing the SignalChannelReader interface as shown below:
public class HttpSignalChannel implements SignalChannelReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpSignalChannel.class);
    public static final String CHANNEL_NAME = "http";
    private static final List<SignalRecord> SIGNALS = new ArrayList<>();
    public CommonConnectorConfig connectorConfig;
        @Override
    public String name() { (1)
        return CHANNEL_NAME;
    }
    @Override
    public void init(CommonConnectorConfig connectorConfig) { (2)
        this.connectorConfig = connectorConfig;
    }
    @Override
    public List<SignalRecord> read() { (3)
        try {
            String requestUrl = "http://mockServer:1080/api/signal?code=10969";
            // send http request to the mock server
            HttpClient httpClient = HttpClient.newHttpClient();
            HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create(requestUrl))
                    .GET()
                    .header("Content-Type", "application/json")
                    .build();
            // read the response
            HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
           if (response.statusCode() == 200) {
               ObjectMapper mapper = new ObjectMapper();
               String responseBody = response.body();
               // parse the response body
               JsonNode signalJson = mapper.readTree(responseBody);
               Map<String, Object> additionalData = signalJson.has("additionalData") ? mapper.convertValue(signalJson.get("additionalData"), new TypeReference<>() {}) : new HashMap<>();
               String id = signalJson.get("id").asText();
               String type = signalJson.get("type").asText();
               String data = signalJson.get("data").toString();
               SignalRecord signal = new SignalRecord(id, type, data, additionalData);
               LOGGER.info("Recorded signal event '{}' ", signal);
               // process the signal
               SIGNALS.add(signal);
                } else {
                    LOGGER.warn("Error while reading signaling events from endpoint: {}", response.statusCode());
                }
            } catch (IOException | InterruptedException e) {
                LOGGER.warn("Exception while preparing to process the signal '{}' from the endpoint", e.getMessage());
                e.printStackTrace();
            }
        return SIGNALS;
        }
    @Override
    public void close() { (4)
       SIGNALS.clear();
    }
}| 1 | The name()method returns the name of the signal channel. To enable Debezium to use the channel, specify the namehttpin the connector’ssignal.enabled.channelsproperty. | 
| 2 | The init()method can be used to initialize specific configuration, variables, or connections that the http channel requires. | 
| 3 | The read()method reads signal from the http endpoint and returns a list ofSignalRecordobjects which will be processed by the Debezium connectors. | 
| 4 | The close()method closes all allocated resources. | 
Create the notification channel by implementing the NotificationChannel interface as shown below:
public class HttpNotificationChannel implements NotificationChannel {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpNotificationChannel.class);
    public static final String CHANNEL_NAME = "http";
    private static final String NOTIFICATION_PREFIX = "[HTTP NOTIFICATION SERVICE]";
    @Override
    public String name() { (1)
        return CHANNEL_NAME;
    }
    @Override
    public void init(CommonConnectorConfig config) { (2)
        // custom configuration
    }
    @Override
    public void send(Notification notification) { (3)
        LOGGER.info(String.format("%s Sending notification to http channel", NOTIFICATION_PREFIX));
        String binId = createBin();
        sendNotification(binId, notification);
    }
    private static String createBin()  {
        // Create a bin on the server
        try {
            HttpRequest request = HttpRequest.newBuilder()
                    .uri(new URI("https://www.toptal.com/developers/postbin/api/bin"))
                    .POST(HttpRequest.BodyPublishers.ofString(" "))
                    .build();
            HttpClient httpClient = HttpClient.newHttpClient();
            HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
            if (response.statusCode() == HTTP_CREATED) {
                String binId = response.body().replaceAll(".*\"binId\":\"([^\"]+)\".*", "$1");
                LOGGER.info("Bin created: " + response.body());
                return binId;
            }
        } catch (URISyntaxException | InterruptedException | IOException e) {
            throw new RuntimeException(e);
        }
        return null;
    }
    private static void sendNotification (String binId, Notification notification) {
        // Get notification from the bin
        try {
            ObjectMapper mapper = new ObjectMapper();
            String notificationString = mapper.writeValueAsString(notification);
            HttpRequest request = HttpRequest.newBuilder()
                    .uri(new URI("https://www.toptal.com/developers/postbin/" + binId))
                    .header("Content-Type", "application/json")
                    .POST(HttpRequest.BodyPublishers.ofString(notificationString))
                    .build();
            HttpClient httpClient = HttpClient.newHttpClient();
            HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
            if (response.statusCode() == HTTP_OK) {
                LOGGER.info("Notification received : " + response.body());
            }
        } catch (URISyntaxException | InterruptedException | IOException e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public void close() { (4)
    }
}| 1 | The name()method returns the name of the notification channel. To enable Debezium to use the channel, specifyhttpin the connector’snotification.enabled.channelsproperty. | 
| 2 | The init()method can be used to initialize specific configuration, variables, or connections that the channel requires. | 
| 3 | The send()method sends the notification to the channel. The notification contains theSignalRecordobject which is processed by the Debezium connectors. | 
| 4 | The close()method closes all allocated resources. | 
Declare the HTTP signal and notification channels in the META-INF/services directory under io.debezium.pipeline.signal.channels.SignalChannelReader and io.debezium.pipeline.notification.channels.NotificationChannel files respectively.
Compile and export the Java project as a JAR file. This can be done using Maven or your preferred build tool. Copy the JAR file to the directory that contains the JAR file for the Debezium connector you want to use. For example, if you want to use the custom signal and notification channels with the Debezium Postgres connector, copy the JAR file to the /kafka/connect/debezium-connector-postgres directory.
This example provides a Docker Compose file with necessary services defined including the Mock Server, Zookeeper, Kafka Connect, and Postgres database.
To start the services, run the following command:
export DEBEZIUM_VERSION=2.3
docker-compose up -dAfter ensuring that the services are up and running, and the Postgres database is ready to accept connections, the next step is to register the connector. This involves creating a connector configuration file. Let’s create a file named register-postgres.json with the following properties:
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": 1,
    "database.hostname": "postgres",
    "database.port": 5432,
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "postgres",
    "topic.prefix": "dbserver1",
    "schema.include.list": "inventory",
    "signal.enabled.channels": "http", (1)
    "notification.enabled.channels": "http" (2)
  }
}| 1 | The signal.enabled.channelsproperty specifies the signal channel to be used by the connector. In this case, the connector uses thehttpsignal channel. | 
| 2 | The notification.enabled.channelsproperty specifies the notification channel to be used by the connector. In this case, the connector uses thehttpnotification channel. | 
Now that we have the connector configuration file prepared, we can proceed to register the connector with Kafka Connect by executing the following command:
curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @register-postgres.jsonOnce the connector is successfully registered, you can review the connector logs to observe the signal events. The logs provide insights into the processing and progress of the connector, including any signal-related information. You will encounter log messages similar to the following:
Recorded signal event 'SignalRecord{id='924e3ff8-2245-43ca-ba77-2af9af02fa07', type='log', data='{"message":"Signal message received from http endpoint."}', additionalData={}}'    [io.debezium.examples.signal.HttpSignalChannel]Additionally, you might notice log messages related to notification events being sent to the Postbin. For example:
[HTTP NOTIFICATION SERVICE] Sending notification to http channel   [io.debezium.examples.notification.HttpNotificationChannel]
Bin created: {"binId":"1688742588469-1816775151528","now":1688742588470,"expires":1688744388470}   [io.debezium.examples.notification.HttpNotificationChannel]It provides information about the notification event, such as the creation of a bin with a unique identifier (binId) and other relevant details. To retrieve the notification event from Postbin, fetch the binId from the log message and use it to request the corresponding notification event from Postbin. To view the notification event, you can access Postbin using the following URL: https://www.toptal.com/developers/postbin/b/:binId. Replace :binId in the URL with the actual binId obtained from the connector logs.
The notification event sent to Postbin looks like the following:
 
 Conclusion
In this tutorial, we explored how to create custom signal and notification channels for Debezium connectors. We created a custom signal channel that receive a signal event from an HTTP endpoint. We also created a custom notification channel that sends a notification event to an HTTP endpoint.
Debezium’s comprehensive signaling and notification system offers seamless integration with third-party solutions, allowing users to stay informed about the state and progress of Debezium connectors. The system’s extensibility empowers users to customize both the signals and notification channels to fit their customized needs.
Stay tuned for Part 3 of this series where we will explore about JMX signaling and notifications. In the meantime, you can check out the Debezium documentation for more information about signal and notification channels.
If you have any questions or feedback, please feel free to reach out to us on the Debezium mailing list or the #community-general channel on the Zulip chat. We would love to hear from you!
About Debezium
Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.
Get involved
We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.