Welcome to the third installment of our series on Debezium Signaling and Notifications. In this article, we continue our exploration of Debezium signaling and notifications. In particular, we will delve into how to enable and manage these features using the JMX channel.

We will also explore how to send signals and get notifications through the REST API leveraging Jolokia.

Interact with Debezium via JMX

JMX stands for Java Management Extensions, a Java technology for managing and monitoring Java applications. It provides a standardized way to monitor application performance, configure settings, and interact with running Java applications using various management tools and clients. JMX is particularly useful for managing and monitoring complex, distributed, and enterprise-level Java applications.

Enable signaling via JMX channel

Signaling in Debezium is about triggering actions to execute operations during normal execution. As discussed in previous articles, Debezium offers different out-of-the-box signal channels. In this article, we will focus on the JMX channel.

To start using the JMX signal channel, we need to:

  • Enable JMX server on Kafka Connect service

  • Add jmx to signal.enabled.channels connector configuration property

  • Use a JMX client to connect to JMX server to send signal.

Debezium exposes the signal MBean named debezium.<connector-type>:type=management,context=signals,server=<server>. This bean exposes signal operations that accept three parameters:

  • p0: The id of the signal.

  • p1: The type of the signal, for example, execute-snapshot.

  • p2: A JSON data field that contains additional information about the specified signal type.

Enable notifications via JMX channel

Notifications are pivotal in informing you about what happens in Debezium. Accessing notifications via the JMX channel permits you to monitor Debezium easily, for example, the progress of an incremental snapshot.

To start using the JMX notification channel, we need to:

  • Enable JMX server on Kafka Connect service

  • Add jmx to notification.enabled.channels connector configuration property

  • Use a JMX client to connect to JMX server to access to notifications.

Debezium exposes the notification MBean named debezium.<connector-type>:type=management,context=notifications,server=<server>. This bean provides a Notification bean that contains a list of JMX CompositeData types with the following attributes:

Property Description

id

A unique identifier that is assigned to the notification. For incremental snapshot notifications, the id is the same sent with the execute-snapshot signal.

aggregate_type

The data type of the aggregate root to which a notification is related. In domain-driven design, exported events should always refer to an aggregate.

type

Provides status information about the event specified in the aggregate_type field.

additional_data

A Map<String,String> with detailed information about the notification.

Let’s take a moment and see how to send an incremental snapshot and to receive notifications about its progress through the JMX channel.

Send incremental snapshot signal through JMX channel

We will use the Debezium docker images with a PostgreSQL database for this example.

We can start up all the required services using the following docker compose file

version: '2'
services:
  zookeeper:
    container_name: zookeeper
    image: quay.io/debezium/zookeeper:2.4
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888
  kafka:
    container_name: kafka
    image: quay.io/debezium/kafka:2.4
    ports:
      - 9092:9092
    links:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
  postgres:
    container_name: postgres
    image: quay.io/debezium/example-postgres:2.4
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  connect:
    container_name: connect
    image: quay.io/debezium/connect:2.4
    ports:
      - 8083:8083
      - 9012:9012 (1)
      - 8778:8778
    links:
      - kafka
      - postgres
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
      - JMXPORT=9012 (2)
      - JMXHOST=0.0.0.0 (3)
      - ENABLE_JOLOKIA=true
1 This will expose the port 9012 used to connect to JMX Server
2 Enables JMX and specifies the port number that will be used for JMX. The value is used to specify the JVM parameter -Dcom.sun.management.jmxremote.port=$JMX_PORT.
3 The IP address or resolvable host name of the Docker host, which JMX uses to construct a URL sent to the JMX client. A value of localhost or 127.0.0.1 will not work. Typically, 0.0.0.0 can be used. The value is used to specify the JVM parameter -Djava.rmi.server.hostname=$JMXHOST

After saving the file as debezium.yaml, we can start all services with:

docker compose -f debezium.yaml up -d

the output will be something like this

[+] Running 5/5
 ✔ Network deploy_default        Created                                                                                                                                                                                           0.1s
 ✔ Container deploy-zookeeper-1  Started                                                                                                                                                                                           0.1s
 ✔ Container deploy-postgres-1   Started                                                                                                                                                                                           0.1s
 ✔ Container deploy-kafka-1      Started                                                                                                                                                                                           0.1s
 ✔ Container deploy-connect-1    Started

Now we can check that all services are up and running executing the command

docker ps

the output should be something similar to this

CONTAINER ID   IMAGE                            COMMAND                  CREATED         STATUS         PORTS                                                                              NAMES
f1d49fb79dba   quay.io/debezium/connect:2.4                "/docker-entrypoint.…"   3 seconds ago   Up 2 seconds   0.0.0.0:8083->8083/tcp, 0.0.0.0:8778->8778/tcp, 0.0.0.0:9012->9012/tcp, 9092/tcp   deploy-connect-1
e164b2651fbf   quay.io/debezium/kafka:2.4       "/docker-entrypoint.…"   3 seconds ago   Up 2 seconds   0.0.0.0:9092->9092/tcp                                                             deploy-kafka-1
e61116f22f9d   quay.io/debezium/example-postgres:2.4    "docker-entrypoint.s…"   4 seconds ago   Up 2 seconds   0.0.0.0:5432->5432/tcp                                                             deploy-postgres-1
ccb502882928   quay.io/debezium/zookeeper:2.4   "/docker-entrypoint.…"   4 seconds ago   Up 2 seconds   0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp             deploy-zookeeper-1

At this point all services are up and running, so we can register the connector through the following configuration

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.server.id": "184054",
    "database.dbname": "postgres",
    "topic.prefix": "dbserver1",
    "snapshot.mode": "NEVER",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "signal.enabled.channels": "source,jmx", (1)
    "signal.data.collection": "inventory.debezium_signal", (2)
    "notification.enabled.channels": "jmx"
  }
}
1 This configuration enables the source and jmx channels. Even if we only want to use JMX to send signals to execute an incremental snapshot, the source signal is still required because Debezium needs to use signal table to watermark the db log for event deduplication.
2 Set the table used for the signaling
For now, don’t worry about the notification.enabled.channels property. We will delve into it later

After saving this configuration into a file named postgres-jmx.json, we can register it.

To register the connector we can use curl to call the Kafka Connect API

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.server.id":"184054","database.dbname":"postgres","topic.prefix":"dbserver1","snapshot.mode":"NEVER","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","signal.enabled.channels":"source,jmx","signal.data.collection":"inventory.debezium_signal","notification.enabled.channels":"log,sink,jmx","notification.sink.topic.name":"io.debezium.notification"}}'

or I would suggest to use kcctl tool to interact with Kafka Connect. It is a modern and intuitive command line client for Kafka Connect.

First of all we need to create a configuration context to connect with Kafka Connect

kcctl config set-context local --cluster http://localhost:8083

then we can register the connector running the following command

kcctl apply -f postgres-jmx.json

we can now get the logs of connect container

docker logs connect

and check that the connector is started streaming events

INFO   Postgres|dbserver1|streaming  Starting streaming   [io.debezium.pipeline.ChangeEventSourceCoordinator]

Prepare database for incremental snapshot

Since incremental snapshots require the signal.data.collection to be defined, we need to create the signal table on our postgres database.

The data collection is not required when using MySql with GTIDs and read.only set to true.

To create the signal table we need to connect to our postgres instance. We can use the psql client inside the postgres container.

docker exec -it postgres bash

Once inside the container we can connect to postgres instance with

psql -h localhost -d postgres -U postgres
password is postgres

then we can check that there are already some tables inside the inventory schema

\dt inventory.*

the command should return something similar to

                List of relations
  Schema   |       Name       | Type  |  Owner
-----------+------------------+-------+----------
 inventory | customers        | table | postgres
 inventory | geom             | table | postgres
 inventory | orders           | table | postgres
 inventory | products         | table | postgres
 inventory | products_on_hand | table | postgres
 inventory | spatial_ref_sys  | table | postgres
(6 rows)

We need to create the signal table with the following command:

CREATE TABLE inventory.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);

Send incremental snapshot signal

We must connect to the JMX server to send the signal through the JMX channel. We use the jmxterm client, so after downloading it, we can run it

java -jar jmxterm-1.0.4-uber.jar (1)

open localhost:9012 (2)

beans -d debezium.postgres (3)

run -b debezium.postgres:context=signals,server=dbserver1,type=management signal 12345 execute-snapshot {"data-collections":["inventory.orders"],"type":"INCREMENTAL"} (4)
1 Run the jmxterm client
2 Open a connection to JMX Server
3 Search for bean under debezium.postgres domain
4 Execute the signal operation to execute an incremental snapshot of the inventory.orders table.

Check data

After that, we want to check that all the data from the orders table has been correctly captured in its corresponding Kafka topic.

We can enter the Kafka container with the following command:

docker exec -it kafka bash

Once inside the container we can get all messages in the dbserver1.inventory.orders topic with the following command

kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver1.inventory.orders --from-beginning

the output should be something like this

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "txId"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "xmin"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.orders.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "id": 10001,
      "order_date": 16816,
      "purchaser": 1001,
      "quantity": 1,
      "product_id": 102
    },
    "source": {
      "version": "2.4.0-SNAPSHOT",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1695631605203,
      "snapshot": "incremental",
      "db": "postgres",
      "sequence": "[\"34837776\",\"34837776\"]",
      "schema": "inventory",
      "table": "orders",
      "txId": null,
      "lsn": null,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1695631605204,
    "transaction": null
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "txId"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "xmin"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.orders.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "id": 10002,
      "order_date": 16817,
      "purchaser": 1002,
      "quantity": 2,
      "product_id": 105
    },
    "source": {
      "version": "2.4.0-SNAPSHOT",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1695631605204,
      "snapshot": "incremental",
      "db": "postgres",
      "sequence": "[\"34837776\",\"34837776\"]",
      "schema": "inventory",
      "table": "orders",
      "txId": null,
      "lsn": null,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1695631605204,
    "transaction": null
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "txId"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "xmin"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.orders.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "id": 10003,
      "order_date": 16850,
      "purchaser": 1002,
      "quantity": 2,
      "product_id": 106
    },
    "source": {
      "version": "2.4.0-SNAPSHOT",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1695631605204,
      "snapshot": "incremental",
      "db": "postgres",
      "sequence": "[\"34837776\",\"34837776\"]",
      "schema": "inventory",
      "table": "orders",
      "txId": null,
      "lsn": null,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1695631605204,
    "transaction": null
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "txId"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "xmin"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.orders.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "id": 10004,
      "order_date": 16852,
      "purchaser": 1003,
      "quantity": 1,
      "product_id": 107
    },
    "source": {
      "version": "2.4.0-SNAPSHOT",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1695631605204,
      "snapshot": "incremental",
      "db": "postgres",
      "sequence": "[\"34837776\",\"34837776\"]",
      "schema": "inventory",
      "table": "orders",
      "txId": null,
      "lsn": null,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1695631605204,
    "transaction": null
  }
}

That’s all! We have sent an incremental snapshot signal using the JMX channel.

Monitor incremental snapshot progress through JMX channel

Since we have executed an incremental snapshot, we can now read the notifications produced by Debezium via the JMX channel.

We used the following configuration to register our connector

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.server.id": "184054",
    "database.dbname": "postgres",
    "topic.prefix": "dbserver1",
    "snapshot.mode": "NEVER",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "signal.enabled.channels": "source,jmx",
    "signal.data.collection": "inventory.debezium_signal",
    "notification.enabled.channels": "jmx" (1)
  }
}
1 This configuration enables jmx notification channel.

To access the notifications, we need to connect again to the JMX server. So as we did for the signal, we will use jmxterm

java -jar jmxterm-1.0.4-uber.jar (1)

open localhost:9012 (2)

beans -d debezium.postgres (3)

get -b debezium.postgres:context=notifications,server=dbserver1,type=management Notifications (4)
1 Run the jmxterm client
2 Open a connection to JMX Server
3 Search for bean under debezium.postgres domain
4 Get notifications.

you should expect the following output

#mbean = debezium.postgres:context=notifications,server=dbserver1,type=management:
Notifications = [ {
  additionalData = {
    ( connector_name ) = {
      key = connector_name;
      value = dbserver1;
     };
   };
  aggregateType = Initial Snapshot;
  id = b20bec8d-f21f-4d74-bb75-cdd7f4c7d933;
  type = SKIPPED;
 }, (1)
{
  additionalData = {
    ( connector_name ) = {
      key = connector_name;
      value = dbserver1;
     };
    ( data_collections ) = {
      key = data_collections;
      value = inventory.orders;
     };
   };
  aggregateType = Incremental Snapshot;
  id = 12345;
  type = STARTED;
 }, (2)
{
  additionalData = {
    ( current_collection_in_progress ) = {
      key = current_collection_in_progress;
      value = inventory.orders;
     };
    ( connector_name ) = {
      key = connector_name;
      value = dbserver1;
     };
    ( maximum_key ) = {
      key = maximum_key;
      value = 10004;
     };
    ( last_processed_key ) = {
      key = last_processed_key;
      value = 10004;
     };
    ( data_collections ) = {
      key = data_collections;
      value = inventory.orders;
     };
   };
  aggregateType = Incremental Snapshot;
  id = 12345;
  type = IN_PROGRESS;
 }, (3)
{
  additionalData = {
    ( scanned_collection ) = {
      key = scanned_collection;
      value = inventory.orders;
     };
    ( connector_name ) = {
      key = connector_name;
      value = dbserver1;
     };
    ( total_rows_scanned ) = {
      key = total_rows_scanned;
      value = 4;
     };
    ( status ) = {
      key = status;
      value = SUCCEEDED;
     };
    ( data_collections ) = {
      key = data_collections;
      value = inventory.orders;
     };
   };
  aggregateType = Incremental Snapshot;
  id = 12345;
  type = TABLE_SCAN_COMPLETED;
 }, (4)
{
  additionalData = {
    ( connector_name ) = {
      key = connector_name;
      value = dbserver1;
     };
   };
  aggregateType = Incremental Snapshot;
  id = 12345;
  type = COMPLETED;
 } (5)
];
1 This is a notification from the initial snapshot with the status SKIPPED since our connector is configured with "snapshot.mode": "NEVER"
2 This is the notification about the incremental snapshot start
3 This notification tells that the table inventory.orders snapshot is in progress and provides useful information about the last processed and max keys. In this example, we have only one in progress notification, but depending on your table size and snapshot.fetch.size, you can get more.
4 This notification tells that the snapshot for a specific table has been completed and provides information about the total rows processed.
5 This is the last notification that we have for this example and tells that the entire incremental snapshot progress has completed.
JMX also offers the possibility to produce its own notifications. Debezium will also produce these notifications. Depending on your JMX client, you can subscribe to these notifications, so you can immediately receive them without polling the Notification bean.

Leveraging Jolokia for REST-based Signaling and Notifications

Jolokia is a powerful tool that allows you to interact with a JMX server and expose it via REST. Using it, we can interact with Debezium via REST, leveraging the signaling and notifications JMX beans. In this way, you can send signals and receive notifications seamlessly and with a more familiar REST api.

To enable Jolokia we need to enable its agent on our Kafka Connect container.

This is the docker compose file used in our example

version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:2.4
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:2.4
    ports:
      - 9092:9092
    links:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
  postgres:
    image: quay.io/debezium/example-postgres:2.4
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  connect:
    image: quay.io/debezium/connect:2.4
    ports:
      - 8083:8083
      - 9012:9012
      - 8778:8778 (1)
    links:
      - kafka
      - postgres
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
      - JMXPORT=9012
      - JMXHOST=0.0.0.0
      - ENABLE_JOLOKIA=true (2)
1 Will expose the port used by Jolokia agent
2 This will enable the Jolokia agent already shipped in our test images. If you want to enable the agent on your installation please check the official documentation

Sending Signals via Jolokia

To send signals via Jolokia, we can send an HTTP POST request to the Jolokia endpoint with the desired signal and parameters.

To continue with our incremental snapshot example, to trigger it you can run the following command

curl -X POST 'http://localhost:8778/jolokia/exec' -d '{"type":"EXEC","mbean":"debezium.postgres:context=signals,server=dbserver1,type=management","operation":"signal","arguments":["12345","execute-snapshot","{\"data-collections\": [\"inventory.products\"], \"type\": \"INCREMENTAL\"}"]}' | jq

this should be the output

{
  "request": {
    "mbean": "debezium.postgres:context=signals,server=dbserver1,type=management",
    "arguments": [
      "12345",
      "execute-snapshot",
      "{\"data-collections\": [\"inventory.products\"], \"type\": \"INCREMENTAL\"}"
    ],
    "type": "exec",
    "operation": "signal"
  },
  "value": null,
  "timestamp": 1695651387,
  "status": 200
}

Receiving Notifications via Jolokia

Jolokia also allows you to fetch notifications from Debezium using HTTP GET requests.

curl -X GET 'http://localhost:8778/jolokia/read/debezium.postgres:context=notifications,server=dbserver1,type=management/Notifications' | jq

this should be the output

{
  "request": {
    "mbean": "debezium.postgres:context=notifications,server=dbserver1,type=management",
    "attribute": "Notifications",
    "type": "read"
  },
  "value": [
    {
      "additionalData": {
        "connector_name": "dbserver1"
      },
      "id": "b20bec8d-f21f-4d74-bb75-cdd7f4c7d933",
      "type": "SKIPPED",
      "aggregateType": "Initial Snapshot"
    },
    {
      "additionalData": {
        "connector_name": "dbserver1",
        "data_collections": "inventory.orders"
      },
      "id": "12345",
      "type": "STARTED",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "last_processed_key": "10004",
        "current_collection_in_progress": "inventory.orders",
        "connector_name": "dbserver1",
        "maximum_key": "10004",
        "data_collections": "inventory.orders"
      },
      "id": "12345",
      "type": "IN_PROGRESS",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "scanned_collection": "inventory.orders",
        "connector_name": "dbserver1",
        "total_rows_scanned": "4",
        "status": "SUCCEEDED",
        "data_collections": "inventory.orders"
      },
      "id": "12345",
      "type": "TABLE_SCAN_COMPLETED",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "connector_name": "dbserver1"
      },
      "id": "12345",
      "type": "COMPLETED",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "connector_name": "dbserver1",
        "data_collections": "inventory.products"
      },
      "id": "12345",
      "type": "STARTED",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "last_processed_key": "109",
        "current_collection_in_progress": "inventory.products",
        "connector_name": "dbserver1",
        "maximum_key": "109",
        "data_collections": "inventory.products"
      },
      "id": "12345",
      "type": "IN_PROGRESS",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "scanned_collection": "inventory.products",
        "connector_name": "dbserver1",
        "total_rows_scanned": "9",
        "status": "SUCCEEDED",
        "data_collections": "inventory.products"
      },
      "id": "12345",
      "type": "TABLE_SCAN_COMPLETED",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "connector_name": "dbserver1"
      },
      "id": "12345",
      "type": "COMPLETED",
      "aggregateType": "Incremental Snapshot"
    }
  ],
  "timestamp": 1695652278,
  "status": 200
}

You can see that now we have also the notification about inventory.products table incremental snapshot that we have sent through REST api

Conclusion

In this third installment of our series on Debezium Signaling and Notifications, we’ve learned how to enable and manage both signaling and notifications using JMX and Jolokia. Signaling lets you dynamically control Debezium’s behavior, while notifications keep you informed about critical events. By harnessing these capabilities along with Jolokia, you can effectively manage, monitor, and interact with your data streaming workflows, ensuring that you always control Debezium.

Fiore Mario Vitale

Mario is a Senior Software Engineer at Red Hat. He lives in Italy.

   


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.