This post is part of a 3-part series to explore using Debezium to ingest changes from an Oracle database using Oracle LogMiner. In case you missed it, the first part of this series is here.

In this second installment, we will build on what we did in part one by deploying the Oracle connector using Zookeeper, Kafka, and Kafka Connect. We are going to discuss a variety of configuration options for the connector and why they’re essential. And finally, we’re going to see the connector in action!

Setting up Kafka Connect and prerequisites

In order to use Debezium, three separate services need to be started:

We will use Docker containers to run the above services. Using separate containers simplifies the deployment process so you can see Debezium in action. In addition, we will also download the Oracle JDBC driver and mount it as part of the Kafka Connect container.

Using multiple instances of these services in production provides performance, reliability, and fault tolerance. The deployment would typically involve a platform like OpenShift or Kubernetes to manage multiple containers, or you would use dedicated hardware and manage this manually.

For this blog, we will use a single instance of each service to keep it simple.

The Zookeeper and Kafka containers are ephemeral. Typically, volumes would be mounted on the host machine so that when the container stops, data managed by the container persists. For the sake of simplicity, we are skipping this step so that when the container stops, data is lost.

Prerequisites: Starting Zookeeper

The Zookeeper service is the first service that’s started. The Kafka broker uses Zookeeper to handle the leadership election of Kafka brokers and manages the service discovery within the cluster so that each broker knows when a sibling has joined or left when a broker terminates, and whom the new leader is for a given topic/partition tuple.

Open a new terminal window and run the following command:

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 \
  quay.io/debezium/zookeeper:1.9

The zookeeper container is started in interactive mode and destroyed when stopped. The container is named zookeeper, which will be important when starting future containers.

Prerequisites: Starting Kafka

The Kafka service is the second service that must be started and depends on the Zookeeper service. Debezium produces change events sent to topics managed by the Kafka broker.

Open a new terminal window and run the following command:

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper \
  quay.io/debezium/kafka:1.9

The kafka container is started in interactive mode and destroyed when stopped. The container is named kafka, which will be important starting future containers. Additionally, the kafka service also links to the zookeeper service, meaning that the canonical name zookeeper will resolve to the container running the zookeeper service.

Prerequisites: Download Oracle JDBC driver

The Debezium Kafka Connect image does not ship with the Oracle JDBC driver. To use Debezium for Oracle, the JDBC driver must be manually downloaded and mounted into the Debezium Kafka Connect image.

Navigate to the Oracle Database JDBC driver downloads page. At the time of this publication, the latest Oracle database is Oracle 21, so click on the ojdbc8.jar link under the Oracle 21c section. The downloaded jar will be used in the next section, adding the driver to the base image of Debezium’s Kafka Connect container.

Prerequisites: Starting Kafka Connect

The Kafka Connect service is the third and final service that must be started and depends on the Kafka service. Kafka Connect is responsible for managing all connectors and their related workloads and is the runtime environment accountable for running the Debezium Connector for Oracle when we deploy it shortly.

Open a new terminal window and run the following command:

docker run -it --rm --name connect -p 8083:8083 \
  -e GROUP_ID=1 \
  -e CONFIG_STORAGE_TOPIC=my_connect_configs \
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets \
  -e STATUS_STORAGE_TOPIC=my_connect_statuses \
  --link kafka:kafka \
  --link dbz_oracle21:dbz_oracle21 \
  -v /path/to/ojdbc8.jar:/kafka/libs/ojdbc8.jar \
  quay.io/debezium/connect:1.9

The connect container is started in interactive mode and destroyed when stopped. The container is named connect, and several environment variables control the naming of several required topics and some required configuration parameters. Additionally, the connect container links to the kafka container, meaning that the canonical name kafka will resolve to the container running the kafka broker service.

Unlike prior containers, we mount a volume using the -v command. The argument takes the format of local-path:container-path.

The local-path represents where the ojdbc8.jar file exists on the host machine. The container-path should remain /kafka/libs/ojdbc8.jar, installing the driver on the Kafka Connect classpath.

Create some initial test data

If the Oracle database created in part one of this series uses the Oracle container registry image, no initial data exists in the database. While this doesn’t necessarily present a problem, we’d ideally like to snapshot some data when deploying the Oracle connector; ergo, some initial data must exist before deployment.

In a new terminal, let’s connect to the database using SQL*Plus and create a new table with some initial data. The following uses the common user, connecting to the pluggable database ORCLPDB1. You can safely skip this step when connecting to an existing environment with tables to be captured.

docker exec -it -e ORACLE_SID=ORCLPDB1 dbz_oracle21 sqlplus c##dbzuser@ORCLPDB1

Once connected, use the following SQL to create a table and some initial data:

CREATE TABLE customers (id number(9,0) primary key, name varchar2(50));
INSERT INTO customers VALUES (1001, 'Salles Thomas');
INSERT INTO customers VALUES (1002, 'George Bailey');
INSERT INTO customers VALUES (1003, 'Edward Walker');
INSERT INTO customers VALUES (1004, 'Anne Kretchmar');
COMMIT;

By default, the redo logs only capture minimal information about changes in the CUSTOMERS table because supplemental logging is set only at the database level.

If you are familiar with PostgreSQL’s REPLICA IDENTITY or MySQL’s binlog_format, Oracle provides a similar mechanism called table-level supplemental logging, which we mentioned in part one of this series. Supplemental logging at the table level controls the columns captured in the redo logs when users modify rows. Setting the table’s supplemental log level to (ALL) COLUMNS guarantees that Oracle captures changes associated with INSERT, UPDATE, and DELETE operations in the redo logs.

Use the following SQL to set the table’s supplemental log level:

ALTER TABLE customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

Suppose a captured table’s supplemental log level is incorrectly set. In that case, the connector will log a warning letting you know there is a problem so that you can adjust the table’s settings to capture all changes.

It is worth pointing out that while this example uses the same user account to create this CUSTOMERS table that the connector uses to connect, it’s not at all uncommon for the user used by the connector to differ from the user who owns the tables in the Oracle database. In this case, the connector user must have permission to read the captured tables, requiring the SELECT permission per table.

Deploying the Oracle connector

We are now ready to deploy the Debezium Oracle connector. Before registering the connector with Kafka Connect, let’s look at the configuration in-depth.

Below is a sample configuration we will use in this example:

{
  "name": "customers-connector",
  "config": {
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "tasks.max": "1",
    "database.hostname": "dbz_oracle21",
    "database.port": "1521",
    "database.user": "c##dbzuser",
    "database.password": "dbz",
    "database.dbname": "ORCLCDB",
    "database.pdb.name": "ORCLPDB1",
    "database.server.name": "server1",
    "table.include.list": "C##DBZUSER.CUSTOMERS",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes"
  }
}

Let’s take a dive into what each of these configuration options mean.

name

This is the name assigned to the connector, which must be unique across the Kafka connect cluster.

connector.class

This is the class implementation of the deployed connector. Each of the Debezium source connectors have a unique class name to identify which connector is being deployed.

tasks.max

This is the maximum number of tasks that will be assigned to the connector deployment in Kafka Connect. Most Debezium connectors read changse from the source database sequentially, therefore, a value of 1 often makes sense.

database.hostname

This is the database hostname or IP address. Since we specified a link to dbz_oracle21 container when starting Kafka Connect, we can use that name here to identify the container running the Oracle database. If you have a pre-existing Oracle environment on another host, specify the name of that host in this configuration property.

database.port

This is the port the database uses to listen for connections. Oracle’s default port is 1521 but a database administrator can configure this to be any available port. If you are connecting to a pre-existing Oracle instance, use the port the database uses.

database.user

This is the database user account used for JDBC connections. This should be the common user created in part one of this series, the c##dbzuser user. If you are connecting to an environment that doesn’t support multi-tenancy, this will be the user you created in the root database without the common-user prefix.

database.password

This is the database user account password.

database.dbname

This is the database service that the connector communications with. Regardless of whether multi-tenancy is enabled or not, this will always be the singular or root container database.

database.pdb.name

This is the optional pluggable database system identifier. This property must be provided when connecting to a database that supports multi-tenancy and refers to the PDB. If this field is omitted, the connector assumes the database does not support multi-tenancy.

database.server.name

The prefix used for all topics created by the connector. This value must be unique across all topic deployments within the Kafka Connect cluster.

table.include.list

A comma-separated list of regular expression or simple table names using the format of <schema>.<table> identifying what tables will be captured by the connector.

database.history.kafka.bootstrap.servers

This is the URL to the Kafka broker where the database history topic will be stored. Since we specified a link to kafka container when starting Kafka Connect, we can use that name here to point to the broker and its port.

database.history.kafka.topic

This is the name of the topic that will store the database schema history. This topic will be recovered when the connector restarts, populating the in-memory relational model from this topic.

All Debezium connectors, except PostgreSQL, use a schema history to store the schemas of all tables. This is often not ideal for Oracle databases, especially when deploying the connector without multi-tenancy.

To restrict the storage to only tables in the include list, modify the connector’s configuration by setting the database.history.store.only.captured.tables.ddl property to true.

For more information on other connector properties, you can review the Oracle documentation for more details.

To deploy the connector, save the above configuration to a file called register-oracle.json. Now, open a new terminal window and use the curl command to register the connector with Kafka Connect:

curl -i -X POST -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  localhost:8083/connectors \
  -d @register-oracle.json

If the registration is successful, the terminal where the connect container is running will start performing a snapshot of the data in the CUSTOMERS table. We can also confirm that the data exists in Kafka by using the Kafka console consumer tool and reading the topic’s contents to the local terminal.

To check the contents of the topic, use the same terminal where the connector was registered and execute the following command:

docker exec -it kafka /kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server 0.0.0.0:9092 \
  --from-beginning \
  --property print.key=true \
  --topic server1.C__DBZUSER.CUSTOMERS

The topic converts the schema name from C##DBZUSER to C__DBZUSER because the topic naming strategy automatically guarantees that the topic’s name is compatible with Avro, which does not allow the hash sign character.

The output of the above command should look similar to the following:

{
  "schema":{
    ...
  },
  "payload":{
    "before":null,
    "after":{
      "ID":"1001",
      "NAME":"Salles Thomas"
    },
    "source":{
      "version":"1.9.6.Final",
      "connector":"oracle",
      "name":"server1",
      "ts_ms":1665102121000,
      "snapshot":"true",
      "db":"ORCLPDB1",
      "sequence":null,
      "schema":"C##DBZUSER",
      "table":"CUSTOMERS",
      "txId":null,
      "scn":"2868546",
      "commit_scn":null,
      "lcr_position":null,
      "rs_id":null,
      "ssn":0,
      "redo_thread":null
    },
    "op":"r",
    "ts_ms":1665102126961,
    "transaction":null
  }
}
...

You can now use the SQLPlus terminal where you created the initial test data to INSERT, UPDATE, or DELETE records within the CUSTOMERS table. You will see corresponding change events in the terminal that is presently tailing the server1.C__DBZUSER.CUSTOMERS topic.

Be mindful that SQLPlus does not enable auto-commit by default, so be sure that you automatically commit changes when you change data in the CUSTOMERS table so that it will be visible to the connector’s mining process.

Conclusion

During part one of this series, we discussed what Oracle is, why it’s so popular in the database world, and how to install and configure the database. During this part of the series, we’ve discussed how to install all the prerequisite services, including Zookeeper, Apache Kafka, and Apache Kafka Connect. In addition, we have also deployed a sample Oracle connector captured changes for the CUSTOMERS table.

In the next part of this series, I will discuss performance, how to monitor the connector, and the most critical metrics and why they are essential. We may even build a small dashboard with metrics.

Chris Cranford

Chris is a software engineer at Red Hat. He previously was a member of the Hibernate ORM team and now works on Debezium. He lives in North Carolina just a few hours from Red Hat towers.

   


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.