Welcome to the third installment of our series on Debezium Signaling and Notifications. In this article, we continue our exploration of Debezium signaling and notifications. In particular, we will delve into how to enable and manage these features using the JMX channel.
We will also explore how to send signals and get notifications through the REST API leveraging Jolokia.
Interact with Debezium via JMX
JMX stands for Java Management Extensions, a Java technology for managing and monitoring Java applications. It provides a standardized way to monitor application performance, configure settings, and interact with running Java applications using various management tools and clients. JMX is particularly useful for managing and monitoring complex, distributed, and enterprise-level Java applications.
Enable signaling via JMX channel
Signaling in Debezium is about triggering actions to execute operations during normal execution. As discussed in previous articles, Debezium offers different out-of-the-box signal channels. In this article, we will focus on the JMX channel.
To start using the JMX signal channel, we need to:
-
Enable JMX server on Kafka Connect service
-
Add
jmx
tosignal.enabled.channels
connector configuration property -
Use a JMX client to connect to JMX server to send signal.
Debezium exposes the signal MBean named debezium.<connector-type>:type=management,context=signals,server=<server>
. This bean exposes signal
operations that accept three parameters:
-
p0: The id of the signal.
-
p1: The type of the signal, for example, execute-snapshot.
-
p2: A JSON data field that contains additional information about the specified signal type.
Enable notifications via JMX channel
Notifications are pivotal in informing you about what happens in Debezium. Accessing notifications via the JMX channel permits you to monitor Debezium easily, for example, the progress of an incremental snapshot.
To start using the JMX notification channel, we need to:
-
Enable JMX server on Kafka Connect service
-
Add
jmx
tonotification.enabled.channels
connector configuration property -
Use a JMX client to connect to JMX server to access to notifications.
Debezium exposes the notification MBean named debezium.<connector-type>:type=management,context=notifications,server=<server>
. This bean provides a Notification
bean that contains a list of JMX CompositeData
types with the following attributes:
Property | Description |
---|---|
id | A unique identifier that is assigned to the notification. For incremental snapshot notifications, the |
aggregate_type | The data type of the aggregate root to which a notification is related. In domain-driven design, exported events should always refer to an aggregate. |
type | Provides status information about the event specified in the |
additional_data | A Map<String,String> with detailed information about the notification. |
Let’s take a moment and see how to send an incremental snapshot and to receive notifications about its progress through the JMX channel.
Send incremental snapshot signal through JMX channel
We will use the Debezium docker images with a PostgreSQL database for this example.
We can start up all the required services using the following docker compose file
version: '2'
services:
zookeeper:
container_name: zookeeper
image: quay.io/debezium/zookeeper:2.4
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
container_name: kafka
image: quay.io/debezium/kafka:2.4
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
container_name: postgres
image: quay.io/debezium/example-postgres:2.4
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
connect:
container_name: connect
image: quay.io/debezium/connect:2.4
ports:
- 8083:8083
- 9012:9012 (1)
- 8778:8778
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- JMXPORT=9012 (2)
- JMXHOST=0.0.0.0 (3)
- ENABLE_JOLOKIA=true
1 | This will expose the port 9012 used to connect to JMX Server |
2 | Enables JMX and specifies the port number that will be used for JMX. The value is used to specify the JVM parameter -Dcom.sun.management.jmxremote.port=$JMX_PORT. |
3 | The IP address or resolvable host name of the Docker host, which JMX uses to construct a URL sent to the JMX client. A value of localhost or 127.0.0.1 will not work. Typically, 0.0.0.0 can be used. The value is used to specify the JVM parameter -Djava.rmi.server.hostname=$JMXHOST |
After saving the file as debezium.yaml
, we can start all services with:
docker compose -f debezium.yaml up -d
the output will be something like this
[+] Running 5/5
✔ Network deploy_default Created 0.1s
✔ Container deploy-zookeeper-1 Started 0.1s
✔ Container deploy-postgres-1 Started 0.1s
✔ Container deploy-kafka-1 Started 0.1s
✔ Container deploy-connect-1 Started
Now we can check that all services are up and running executing the command
docker ps
the output should be something similar to this
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f1d49fb79dba quay.io/debezium/connect:2.4 "/docker-entrypoint.…" 3 seconds ago Up 2 seconds 0.0.0.0:8083->8083/tcp, 0.0.0.0:8778->8778/tcp, 0.0.0.0:9012->9012/tcp, 9092/tcp deploy-connect-1
e164b2651fbf quay.io/debezium/kafka:2.4 "/docker-entrypoint.…" 3 seconds ago Up 2 seconds 0.0.0.0:9092->9092/tcp deploy-kafka-1
e61116f22f9d quay.io/debezium/example-postgres:2.4 "docker-entrypoint.s…" 4 seconds ago Up 2 seconds 0.0.0.0:5432->5432/tcp deploy-postgres-1
ccb502882928 quay.io/debezium/zookeeper:2.4 "/docker-entrypoint.…" 4 seconds ago Up 2 seconds 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp deploy-zookeeper-1
At this point all services are up and running, so we can register the connector through the following configuration
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.server.id": "184054",
"database.dbname": "postgres",
"topic.prefix": "dbserver1",
"snapshot.mode": "NEVER",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"signal.enabled.channels": "source,jmx", (1)
"signal.data.collection": "inventory.debezium_signal", (2)
"notification.enabled.channels": "jmx"
}
}
1 | This configuration enables the source and jmx channels. Even if we only want to use JMX to send signals to execute an incremental snapshot, the source signal is still required because Debezium needs to use signal table to watermark the db log for event deduplication. |
2 | Set the table used for the signaling |
For now, don’t worry about the notification.enabled.channels property. We will delve into it later |
After saving this configuration into a file named postgres-jmx.json, we can register it.
To register the connector we can use curl
to call the Kafka Connect API
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.server.id":"184054","database.dbname":"postgres","topic.prefix":"dbserver1","snapshot.mode":"NEVER","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","signal.enabled.channels":"source,jmx","signal.data.collection":"inventory.debezium_signal","notification.enabled.channels":"log,sink,jmx","notification.sink.topic.name":"io.debezium.notification"}}'
or I would suggest to use kcctl tool to interact with Kafka Connect. It is a modern and intuitive command line client for Kafka Connect.
First of all we need to create a configuration context to connect with Kafka Connect
kcctl config set-context local --cluster http://localhost:8083
then we can register the connector running the following command
kcctl apply -f postgres-jmx.json
we can now get the logs of connect container
docker logs connect
and check that the connector is started streaming events
INFO Postgres|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
Prepare database for incremental snapshot
Since incremental snapshots require the signal.data.collection
to be defined, we need to create the signal table on our postgres database.
The data collection is not required when using MySql with GTIDs and read.only set to true. |
To create the signal table we need to connect to our postgres instance. We can use the psql
client inside the postgres container.
docker exec -it postgres bash
Once inside the container we can connect to postgres instance with
psql -h localhost -d postgres -U postgres
password is postgres |
then we can check that there are already some tables inside the inventory schema
\dt inventory.*
the command should return something similar to
List of relations
Schema | Name | Type | Owner
-----------+------------------+-------+----------
inventory | customers | table | postgres
inventory | geom | table | postgres
inventory | orders | table | postgres
inventory | products | table | postgres
inventory | products_on_hand | table | postgres
inventory | spatial_ref_sys | table | postgres
(6 rows)
We need to create the signal table with the following command:
CREATE TABLE inventory.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
Send incremental snapshot signal
We must connect to the JMX server to send the signal through the JMX channel. We use the jmxterm client, so after downloading it, we can run it
java -jar jmxterm-1.0.4-uber.jar (1)
open localhost:9012 (2)
beans -d debezium.postgres (3)
run -b debezium.postgres:context=signals,server=dbserver1,type=management signal 12345 execute-snapshot {"data-collections":["inventory.orders"],"type":"INCREMENTAL"} (4)
1 | Run the jmxterm client |
2 | Open a connection to JMX Server |
3 | Search for bean under debezium.postgres domain |
4 | Execute the signal operation to execute an incremental snapshot of the inventory.orders table. |
Check data
After that, we want to check that all the data from the orders table has been correctly captured in its corresponding Kafka topic.
We can enter the Kafka container with the following command:
docker exec -it kafka bash
Once inside the container we can get all messages in the dbserver1.inventory.orders topic with the following command
kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver1.inventory.orders --from-beginning
the output should be something like this
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.orders.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 10001,
"order_date": 16816,
"purchaser": 1001,
"quantity": 1,
"product_id": 102
},
"source": {
"version": "2.4.0-SNAPSHOT",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1695631605203,
"snapshot": "incremental",
"db": "postgres",
"sequence": "[\"34837776\",\"34837776\"]",
"schema": "inventory",
"table": "orders",
"txId": null,
"lsn": null,
"xmin": null
},
"op": "r",
"ts_ms": 1695631605204,
"transaction": null
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.orders.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 10002,
"order_date": 16817,
"purchaser": 1002,
"quantity": 2,
"product_id": 105
},
"source": {
"version": "2.4.0-SNAPSHOT",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1695631605204,
"snapshot": "incremental",
"db": "postgres",
"sequence": "[\"34837776\",\"34837776\"]",
"schema": "inventory",
"table": "orders",
"txId": null,
"lsn": null,
"xmin": null
},
"op": "r",
"ts_ms": 1695631605204,
"transaction": null
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.orders.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 10003,
"order_date": 16850,
"purchaser": 1002,
"quantity": 2,
"product_id": 106
},
"source": {
"version": "2.4.0-SNAPSHOT",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1695631605204,
"snapshot": "incremental",
"db": "postgres",
"sequence": "[\"34837776\",\"34837776\"]",
"schema": "inventory",
"table": "orders",
"txId": null,
"lsn": null,
"xmin": null
},
"op": "r",
"ts_ms": 1695631605204,
"transaction": null
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.orders.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 10004,
"order_date": 16852,
"purchaser": 1003,
"quantity": 1,
"product_id": 107
},
"source": {
"version": "2.4.0-SNAPSHOT",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1695631605204,
"snapshot": "incremental",
"db": "postgres",
"sequence": "[\"34837776\",\"34837776\"]",
"schema": "inventory",
"table": "orders",
"txId": null,
"lsn": null,
"xmin": null
},
"op": "r",
"ts_ms": 1695631605204,
"transaction": null
}
}
That’s all! We have sent an incremental snapshot signal using the JMX channel.
Monitor incremental snapshot progress through JMX channel
Since we have executed an incremental snapshot, we can now read the notifications produced by Debezium via the JMX channel.
We used the following configuration to register our connector
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.server.id": "184054",
"database.dbname": "postgres",
"topic.prefix": "dbserver1",
"snapshot.mode": "NEVER",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"signal.enabled.channels": "source,jmx",
"signal.data.collection": "inventory.debezium_signal",
"notification.enabled.channels": "jmx" (1)
}
}
1 | This configuration enables jmx notification channel. |
To access the notifications, we need to connect again to the JMX server. So as we did for the signal, we will use jmxterm
java -jar jmxterm-1.0.4-uber.jar (1)
open localhost:9012 (2)
beans -d debezium.postgres (3)
get -b debezium.postgres:context=notifications,server=dbserver1,type=management Notifications (4)
1 | Run the jmxterm client |
2 | Open a connection to JMX Server |
3 | Search for bean under debezium.postgres domain |
4 | Get notifications. |
you should expect the following output
#mbean = debezium.postgres:context=notifications,server=dbserver1,type=management:
Notifications = [ {
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
};
aggregateType = Initial Snapshot;
id = b20bec8d-f21f-4d74-bb75-cdd7f4c7d933;
type = SKIPPED;
}, (1)
{
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = STARTED;
}, (2)
{
additionalData = {
( current_collection_in_progress ) = {
key = current_collection_in_progress;
value = inventory.orders;
};
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( maximum_key ) = {
key = maximum_key;
value = 10004;
};
( last_processed_key ) = {
key = last_processed_key;
value = 10004;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = IN_PROGRESS;
}, (3)
{
additionalData = {
( scanned_collection ) = {
key = scanned_collection;
value = inventory.orders;
};
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( total_rows_scanned ) = {
key = total_rows_scanned;
value = 4;
};
( status ) = {
key = status;
value = SUCCEEDED;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = TABLE_SCAN_COMPLETED;
}, (4)
{
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = COMPLETED;
} (5)
];
1 | This is a notification from the initial snapshot with the status SKIPPED since our connector is configured with "snapshot.mode": "NEVER" |
2 | This is the notification about the incremental snapshot start |
3 | This notification tells that the table inventory.orders snapshot is in progress and provides useful information about the last processed and max keys. In this example, we have only one in progress notification, but depending on your table size and snapshot.fetch.size , you can get more. |
4 | This notification tells that the snapshot for a specific table has been completed and provides information about the total rows processed. |
5 | This is the last notification that we have for this example and tells that the entire incremental snapshot progress has completed. |
JMX also offers the possibility to produce its own notifications. Debezium will also produce these notifications. Depending on your JMX client, you can subscribe to these notifications, so you can immediately receive them without polling the Notification bean. |
Leveraging Jolokia for REST-based Signaling and Notifications
Jolokia is a powerful tool that allows you to interact with a JMX server and expose it via REST. Using it, we can interact with Debezium via REST, leveraging the signaling and notifications JMX beans. In this way, you can send signals and receive notifications seamlessly and with a more familiar REST api.
To enable Jolokia we need to enable its agent on our Kafka Connect container.
This is the docker compose file used in our example
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.4
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:2.4
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:2.4
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
connect:
image: quay.io/debezium/connect:2.4
ports:
- 8083:8083
- 9012:9012
- 8778:8778 (1)
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- JMXPORT=9012
- JMXHOST=0.0.0.0
- ENABLE_JOLOKIA=true (2)
1 | Will expose the port used by Jolokia agent |
2 | This will enable the Jolokia agent already shipped in our test images. If you want to enable the agent on your installation please check the official documentation |
Sending Signals via Jolokia
To send signals via Jolokia, we can send an HTTP POST request to the Jolokia endpoint with the desired signal and parameters.
To continue with our incremental snapshot example, to trigger it you can run the following command
curl -X POST 'http://localhost:8778/jolokia/exec' -d '{"type":"EXEC","mbean":"debezium.postgres:context=signals,server=dbserver1,type=management","operation":"signal","arguments":["12345","execute-snapshot","{\"data-collections\": [\"inventory.products\"], \"type\": \"INCREMENTAL\"}"]}' | jq
this should be the output
{
"request": {
"mbean": "debezium.postgres:context=signals,server=dbserver1,type=management",
"arguments": [
"12345",
"execute-snapshot",
"{\"data-collections\": [\"inventory.products\"], \"type\": \"INCREMENTAL\"}"
],
"type": "exec",
"operation": "signal"
},
"value": null,
"timestamp": 1695651387,
"status": 200
}
Receiving Notifications via Jolokia
Jolokia also allows you to fetch notifications from Debezium using HTTP GET requests.
curl -X GET 'http://localhost:8778/jolokia/read/debezium.postgres:context=notifications,server=dbserver1,type=management/Notifications' | jq
this should be the output
{
"request": {
"mbean": "debezium.postgres:context=notifications,server=dbserver1,type=management",
"attribute": "Notifications",
"type": "read"
},
"value": [
{
"additionalData": {
"connector_name": "dbserver1"
},
"id": "b20bec8d-f21f-4d74-bb75-cdd7f4c7d933",
"type": "SKIPPED",
"aggregateType": "Initial Snapshot"
},
{
"additionalData": {
"connector_name": "dbserver1",
"data_collections": "inventory.orders"
},
"id": "12345",
"type": "STARTED",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"last_processed_key": "10004",
"current_collection_in_progress": "inventory.orders",
"connector_name": "dbserver1",
"maximum_key": "10004",
"data_collections": "inventory.orders"
},
"id": "12345",
"type": "IN_PROGRESS",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"scanned_collection": "inventory.orders",
"connector_name": "dbserver1",
"total_rows_scanned": "4",
"status": "SUCCEEDED",
"data_collections": "inventory.orders"
},
"id": "12345",
"type": "TABLE_SCAN_COMPLETED",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"connector_name": "dbserver1"
},
"id": "12345",
"type": "COMPLETED",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"connector_name": "dbserver1",
"data_collections": "inventory.products"
},
"id": "12345",
"type": "STARTED",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"last_processed_key": "109",
"current_collection_in_progress": "inventory.products",
"connector_name": "dbserver1",
"maximum_key": "109",
"data_collections": "inventory.products"
},
"id": "12345",
"type": "IN_PROGRESS",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"scanned_collection": "inventory.products",
"connector_name": "dbserver1",
"total_rows_scanned": "9",
"status": "SUCCEEDED",
"data_collections": "inventory.products"
},
"id": "12345",
"type": "TABLE_SCAN_COMPLETED",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"connector_name": "dbserver1"
},
"id": "12345",
"type": "COMPLETED",
"aggregateType": "Incremental Snapshot"
}
],
"timestamp": 1695652278,
"status": 200
}
You can see that now we have also the notification about inventory.products
table incremental snapshot that we have sent through REST api
Conclusion
In this third installment of our series on Debezium Signaling and Notifications, we’ve learned how to enable and manage both signaling and notifications using JMX and Jolokia. Signaling lets you dynamically control Debezium’s behavior, while notifications keep you informed about critical events. By harnessing these capabilities along with Jolokia, you can effectively manage, monitor, and interact with your data streaming workflows, ensuring that you always control Debezium.
About Debezium
Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.
Get involved
We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.