You are viewing documentation for an unreleased version of Debezium.
If you want to view the latest stable version of this page, please go here.

OpenLineage Integration

About Data Lineage and OpenLineage

Data lineage tracks the flow of data through various systems, transformations, and processes. This information provides you with visibility into where data originates, how it moves, and what dependencies exist in the data pipeline. Insights into data lineage is crucial for the following activities:

  • Data governance and compliance

  • Impact analysis when making changes

  • Debugging data quality issues

  • Understanding data dependencies

OpenLineage is an open standard for data lineage that provides a unified way to collect and track lineage metadata across multiple data systems. The specification defines a common model for describing datasets, jobs, and runs, simplifying the process of building comprehensive lineage graphs across heterogeneous data infrastructures.

For more information, see the OpenLineage website and documentation.

Deployment types

The Debezium OpenLineage integration is available for the following deployment types:

Kafka Connect

You run Debezium connectors as Kafka Connect plugins.

Debezium Server

You run Debezium in standalone server mode.

The same OpenLineage event model and features are available across the deployment types. But different processes are needed to configure the integration and to install dependencies.

How Debezium integrates with OpenLineage

To integrate with OpenLineage, Debezium maps events in its lifecycle to artifacts in the OpenLineage data model.

OpenLineage job mapping

The Debezium connector is mapped to an OpenLineage Job, which includes the following elements:

Name

A job inherits its name from the Debezium topic.prefix, combined with the task ID (for example, inventory.0).

Namespace

Inherited from openlineage.integration.job.namespace, if specified; otherwise defaults to the value of the topic.prefix.

Debezium connector version

The version of the Debezium connector that is running and generating lineage events.

Complete connector configuration

All connector configuration properties, enabling full reproducibility and debugging of the data pipeline.

Job metadata

Description, tags, and owners.

Dataset mapping for source connectors

The following dataset mappings are possible:

Input Datasets

Represents the database tables that Debezium is configured to capture changes from. The OpenLineage integration automatically creates input datasets based on the connector configuration. The integration applies the following principles when it creates dataset mappings:

  • Each table that the connector monitors becomes an input dataset.

  • Each dataset captures schema information for the corresponding source table, including the name and data type of each column.

  • DDL changes in the source table are reflected dynamically in the dataset schema.

Output datasets

Represents the Kafka topics where CDC events are written. For Kafka Connect deployments, output datasets are created when you apply the OpenLineage single message transformation (SMT). For Debezium Server deployments, output datasets are automatically captured.

Output dataset mappings are created according to the following principles:

  • Each Kafka topic that the connector produces becomes an output dataset.

  • The output dataset captures the complete CDC event structure, including metadata fields.

  • The name of the dataset is based on the connector’s topic prefix configuration.

Dataset mapping for sink connectors

For sink connectors, the data flow is reversed compared to source connectors.

Input datasets

Represent the Kafka topics that the sink connector reads from. These topics typically contain CDC events from Debezium source connectors. The following principles apply when defining input datasets:

  • Each Kafka topic that the sink connector consumes represents an input dataset.

  • The input dataset specifies the Kafka topic schema and metadata.

  • The namespace format follows kafka://bootstrap-server:port, where the bootstrap server is specified via the openlineage.integration.dataset.kafka.bootstrap.servers property.

Output datasets

Represents the target databases or collections where the sink connector writes data.

The following principles apply when defining output dataset mappings:

  • Each target database table or collection represents an output dataset.

  • The output dataset specifies the schema information for the target database.

  • The namespace format depends on the target database system. For more information, see Dataset namespace formatting.

The following Debezium sink connectors support OpenLineage integration in Kafka Connect:

MongoDB Sink Connector

Writes CDC events to MongoDB collections.

JDBC Sink Connector

Writes CDC events to relational database tables.

Debezium Server currently only supports OpenLineage integration with the Kafka sink. MongoDB Sink and JDBC Sink connectors are only supported in Kafka Connect deployments.
Run events

When you integrate Debezium with OpenLineage, the connector emits events to report changes of status. The connector emits OpenLineage run events after the following status changes:

START

Reports connector initialization.

RUNNING

Emitted periodically during normal streaming operations and during processing individual tables. These periodic events ensure continuous lineage tracking for long-running streaming CDC operations.

COMPLETE

Reports that the connector shut down gracefully.

FAIL

Reports that the connector encountered an error.

Required Dependencies

The OpenLineage integration requires several JAR files that are bundled together in the debezium-openlineage-core-libs archive.

Kafka Connect

Before you can use Debezium with OpenLineage in Kafka Connect, complete the following steps to obtain the required dependencies:

  1. Download the OpenLineage core archive.

  2. Extract the contents of the archive into the Debezium plug-in directories in your Kafka Connect environment.

Debezium Server

Before you can use Debezium Server with OpenLineage, complete the following steps to obtain the required dependencies:

  1. Download the OpenLineage core archive.

  2. Extract the contents of the archive.

  3. Copy all JAR files to the /debezium/lib directory in your Debezium Server installation.

Configuring the integration

To enable the integration, you must configure the Debezium connector and the OpenLineage client. The configuration approach differs between Kafka Connect and Debezium Server deployments.

Kafka Connect configuration

To enable Debezium to integrate with OpenLineage in Kafka Connect, add properties to your connector configuration, as shown in the following example:

# Enable OpenLineage integration
openlineage.integration.enabled=true

# Path to OpenLineage configuration file
openlineage.integration.config.file.path=/path/to/openlineage.yml

# Job metadata (optional but recommended)
openlineage.integration.job.namespace=myNamespace
openlineage.integration.job.description=CDC connector for products database
openlineage.integration.job.tags=env=prod,team=data-engineering
openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer

Debezium Server configuration

To enable Debezium Server to integrate with OpenLineage, add OpenLineage properties to the application.properties file, as shown in the following example. OpenLineage properties use the debezium.source. prefix

# Enable OpenLineage integration
debezium.source.openlineage.integration.enabled=true

# Path to OpenLineage configuration file
debezium.source.openlineage.integration.config.file.path=config/openlineage.yml

# Job metadata (optional but recommended)
debezium.source.openlineage.integration.job.description=CDC connector for products database
debezium.source.openlineage.integration.job.tags=env=prod,team=data-engineering
debezium.source.openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer

Configuring the OpenLineage client

Create an openlineage.yml file to configure the OpenLineage client. The openlineage.yml configuration file is used in both Kafka Connect and Debezium Server deployments. Use the following example as a guide:

transport:
  type: http
  url: http://your-openlineage-server:5000
  endpoint: /api/v1/lineage
  auth:
    type: api_key
    api_key: your-api-key

# Alternative: Console transport for testing
# transport:
#   type: console

For detailed OpenLineage client configuration options, refer to the OpenLineage client documentation.

Debezium OpenLineage configuration properties

The following table lists the OpenLineage configuration properties for both deployment types.

For Debezium Server, add the debezium.source. prefix to all property names (for example, debezium.source.openlineage.integration.enabled).
Property (Kafka Connect) Description Required Default

openlineage.integration.enabled

Enables and disables the OpenLineage integration.

Yes

false

openlineage.integration.config.file.path

Path to the OpenLineage YAML configuration file.

Yes

No default value

openlineage.integration.job.namespace

Namespace used for the job.

No

Value of topic.prefix

openlineage.integration.job.description

Human-readable job description

No

No default value

openlineage.integration.job.tags

Comma-separated list of key-value tags.

No

No default value

openlineage.integration.job.owners

Comma-separated list of name-role ownership entries.

No

No default value

openlineage.integration.dataset.kafka.bootstrap.servers (source connectors only)

Kafka bootstrap servers used to retrieve Kafka topic metadata. For source connectors, if you do not specify a value, the value of schema.history.internal.kafka.bootstrap.servers is used.

For sink connectors, you must specify a value for this property.

Yes (for sink connectors)

Value of schema.history.internal.kafka.bootstrap.servers (for source connectors only)

Example: Tags list format

Specify Tags as a comma-separated list of key-value pairs, as shown in the following example:

openlineage.integration.job.tags=environment=production,team=data-platform,criticality=high
Example: Owners list format

Specify Owners as a comma-separated list of name-role pairs, as shown in the following example:

openlineage.integration.job.owners=John Doe=maintainer,Jane Smith=Data Engineer,Team Lead=owner

Output dataset lineage

Debezium can capture output dataset lineage (Kafka topics) to track the destination of CDC events. The configuration approach differs between Kafka Connect and Debezium Server.

Kafka Connect output dataset lineage

To capture output dataset lineage in Kafka Connect, configure Debezium to use the OpenLineage Single Message Transform (SMT):

# Add OpenLineage transform
transforms=openlineage
transforms.openlineage.type=io.debezium.transforms.openlineage.OpenLineage

# Required: Configure schema history with Kafka bootstrap servers
schema.history.internal.kafka.bootstrap.servers=your-kafka:9092

The SMT captures detailed schema information about change events that Debezium writes to Kafka topics. The transformation captures schema data that includes the following items:

  • Event structure (before, after, source, transaction metadata)

  • Field types and nested structures

  • Topic names and namespaces

Debezium Server output dataset lineage

For Debezium Server deployments, output dataset lineage is automatically captured when OpenLineage integration is enabled. No additional configuration or transformation is required, as Debezium Server has full control over the output records.

Complete configuration examples

The following examples show complete configurations for enabling OpenLineage integration in both Kafka Connect and Debezium Server.

Kafka Connect complete configuration example

The following example shows a complete configuration for enabling a PostgreSQL connector to integrate with OpenLineage in Kafka Connect:

{
  "name": "inventory-connector-postgres",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "postgres",
    "topic.prefix": "inventory",
    "snapshot.mode": "initial",
    "slot.name": "inventory",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "openlineage.integration.enabled": "true",
    "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
    "openlineage.integration.job.description": "CDC connector for inventory database",
    "openlineage.integration.job.tags": "env=production,team=data-platform,database=postgresql",
    "openlineage.integration.job.owners": "Data Team=maintainer,Alice Johnson=Data Engineer",
    "transforms": "openlineage",
    "transforms.openlineage.type": "io.debezium.transforms.openlineage.OpenLineage"
  }
}

Debezium Server complete configuration example

The following example shows a complete application.properties configuration for enabling a PostgreSQL connector to integrate with OpenLineage in Debezium Server with a Kafka sink:

# Sink configuration (Kafka)
debezium.sink.type=kafka
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.bootstrap.servers=kafka:9092

# Source connector configuration
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=inventory

# OpenLineage integration
debezium.source.openlineage.integration.enabled=true
debezium.source.openlineage.integration.config.file.path=config/openlineage.yml
debezium.source.openlineage.integration.job.description=CDC connector for products database
debezium.source.openlineage.integration.job.tags=env=prod,team=cdc
debezium.source.openlineage.integration.job.owners=Mario=maintainer,John Doe=Data scientist

# Logging configuration (optional)
quarkus.log.console.json=false

MongoDB sink connector configuration example

The following example shows a complete configuration for enabling the MongoDB sink connector to integrate with OpenLineage in Kafka Connect:

{
  "name": "mongodb-sink",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbSinkConnector",
    "tasks.max": "1",
    "mongodb.connection.string": "mongodb://admin:admin@mongodb:27017",
    "topics": "inventory.inventory.products",
    "sink.database": "inventory2",
    "openlineage.integration.enabled": "true",
    "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
    "openlineage.integration.job.description": "Sink connector for MongoDB",
    "openlineage.integration.job.tags": "env=prod,team=cdc",
    "openlineage.integration.job.owners": "Mario=maintainer,John Doe=Data scientist",
    "openlineage.integration.dataset.kafka.bootstrap.servers": "kafka:9092"
  }
}
For sink connectors, the openlineage.integration.dataset.kafka.bootstrap.servers property is required to retrieve input dataset metadata from Kafka topics. Unlike source connectors, sink connectors do not have direct access to Kafka topic metadata through the Kafka Connect framework and must explicitly connect to retrieve schema information.

JDBC sink connector configuration example

The following example shows a complete configuration for enabling the JDBC sink connector to integrate with OpenLineage in Kafka Connect:

{
  "name": "jdbc-sink",
  "config": {
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://postgres:5432/inventory",
    "connection.username": "postgres",
    "connection.password": "postgres",
    "topics": "inventory.inventory.customers",
    "insert.mode": "upsert",
    "primary.key.mode": "record_key",
    "openlineage.integration.enabled": "true",
    "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
    "openlineage.integration.job.description": "Sink connector for JDBC",
    "openlineage.integration.job.tags": "env=prod,team=data-engineering",
    "openlineage.integration.job.owners": "Data Team=maintainer,Alice Johnson=Data Engineer",
    "openlineage.integration.dataset.kafka.bootstrap.servers": "kafka:9092"
  }
}

Dataset namespace formatting

Debezium formats dataset namespaces according to the OpenLineage dataset naming specification.

Input dataset namespaces

Input dataset namespaces identify the source database and follow a format specific to each database system.

Example: PostgreSQL input dataset (for source connectors)
  • Namespace: postgres://hostname:port

  • Name: schema.table

  • Schema: Column names and types from the source table

Example: Kafka input dataset (for sink connectors)
  • Namespace: kafka://kafka-broker:9092

  • Name: inventory.inventory.products

  • Schema: CDC event structure from the source connector

The exact namespace format depends on your database system and follows the OpenLineage specification for dataset naming.

Output dataset namespaces for source connectors

Output dataset namespaces identify the Kafka topics where CDC events are written.

Example: Kafka output dataset (for source connectors)
  • Namespace: kafka://bootstrap-server:port

  • Name: topic-prefix.schema.table

  • Schema: Complete CDC event structure including metadata fields

Output dataset namespaces for sink connectors

Output dataset namespaces identify the target databases where sink connectors write data.

Example: MongoDB output dataset
  • Namespace: mongodb://mongodb-host:27017

  • Name: database.collection

  • Schema: Target collection schema

Example: JDBC output dataset (PostgreSQL)
  • Namespace: postgres://postgres-host:5432

  • Name: schema.table

  • Schema: Target table schema

Monitoring and Troubleshooting

Verifying the integration

To verify that the OpenLineage integration is working correctly, complete the following steps:

Procedure
  1. Check the connector logs for OpenLineage-related messages.

  2. If you configured HTTP transport, verify that events appear in your OpenLineage backend.

  3. For testing purposes, you can configure console transport to view events directly in the logs, as shown in the following example:

    transport:
      type: console
Common issues
Integration not working
  • Verify that openlineage.integration.enabled is set to true.

  • Check that the path to the OpenLineage configuration file that is specified in the connector configuration is correct, and that Debezium can access the target file.

  • Ensure that the YAML in the OpenLineage configuration file is valid.

  • Verify that all required JAR dependencies are present in the classpath.

Missing output datasets
  • Verify that you configured the connector to use the OpenLineage transformation.

  • Check that you set the property schema.history.internal.kafka.bootstrap.servers in the connector configuration.

Connection issues
  • Verify that you specified the correct server URL and authentication information in the OpenLineage client configuration.

  • Check the network connectivity between Debezium and the OpenLineage server.

Dependency issues
  • Ensure that all required JAR files are present and their versions are compatible versions.

  • Check for classpath conflicts with existing dependencies.

Missing input datasets for sink connectors
  • Verify that the openlineage.integration.dataset.kafka.bootstrap.servers property is configured.

  • Verify that the connector has access to the Kafka bootstrap servers.

  • Verify that the Kafka topics specified in the topics configuration exist and that the connector has access to them.

Error Events

When the connector fails, check for the following items in OpenLineage FAIL events:

  • Error messages

  • Stack traces

  • Connector configuration for debugging