OpenLineage Integration
- About Data Lineage and OpenLineage
- Deployment types
- How Debezium integrates with OpenLineage
- Required Dependencies
- Configuring the integration
- Configuring the OpenLineage client
- Debezium OpenLineage configuration properties
- Output dataset lineage
- Complete configuration examples
- Dataset namespace formatting
- Monitoring and Troubleshooting
Debezium provides built-in integration with OpenLineage to automatically track data lineage for Change Data Capture (CDC) operations. The OpenLineage integration provides you with comprehensive visibility into the data flow and transformations that you use in your data pipeline.
About Data Lineage and OpenLineage
Data lineage tracks the flow of data through various systems, transformations, and processes. This information provides you with visibility into where data originates, how it moves, and what dependencies exist in the data pipeline. Insights into data lineage is crucial for the following activities:
-
Data governance and compliance
-
Impact analysis when making changes
-
Debugging data quality issues
-
Understanding data dependencies
OpenLineage is an open standard for data lineage that provides a unified way to collect and track lineage metadata across multiple data systems. The specification defines a common model for describing datasets, jobs, and runs, simplifying the process of building comprehensive lineage graphs across heterogeneous data infrastructures.
For more information, see the OpenLineage website and documentation.
Deployment types
The Debezium OpenLineage integration is available for the following deployment types:
- Kafka Connect
-
You run Debezium connectors as Kafka Connect plugins.
- Debezium Server
-
You run Debezium in standalone server mode.
The same OpenLineage event model and features are available across the deployment types. But different processes are needed to configure the integration and to install dependencies.
How Debezium integrates with OpenLineage
To integrate with OpenLineage, Debezium maps events in its lifecycle to artifacts in the OpenLineage data model.
The Debezium connector is mapped to an OpenLineage Job, which includes the following elements:
- Name
-
A job inherits its name from the Debezium
topic.prefix
, combined with the task ID (for example,inventory.0
). - Namespace
-
Inherited from
openlineage.integration.job.namespace
, if specified; otherwise defaults to the value of thetopic.prefix
. - Debezium connector version
-
The version of the Debezium connector that is running and generating lineage events.
- Complete connector configuration
-
All connector configuration properties, enabling full reproducibility and debugging of the data pipeline.
- Job metadata
-
Description, tags, and owners.
The following dataset mappings are possible:
- Input Datasets
-
Represents the database tables that Debezium is configured to capture changes from. The OpenLineage integration automatically creates input datasets based on the connector configuration. The integration applies the following principles when it creates dataset mappings:
-
Each table that the connector monitors becomes an input dataset.
-
Each dataset captures schema information for the corresponding source table, including the name and data type of each column.
-
DDL changes in the source table are reflected dynamically in the dataset schema.
-
- Output datasets
-
Represents the Kafka topics where CDC events are written. For Kafka Connect deployments, output datasets are created when you apply the OpenLineage single message transformation (SMT). For Debezium Server deployments, output datasets are automatically captured.
Output dataset mappings are created according to the following principles:
-
Each Kafka topic that the connector produces becomes an output dataset.
-
The output dataset captures the complete CDC event structure, including metadata fields.
-
The name of the dataset is based on the connector’s topic prefix configuration.
-
For sink connectors, the data flow is reversed compared to source connectors.
- Input datasets
-
Represent the Kafka topics that the sink connector reads from. These topics typically contain CDC events from Debezium source connectors. The following principles apply when defining input datasets:
-
Each Kafka topic that the sink connector consumes represents an input dataset.
-
The input dataset specifies the Kafka topic schema and metadata.
-
The namespace format follows
kafka://bootstrap-server:port
, where the bootstrap server is specified via theopenlineage.integration.dataset.kafka.bootstrap.servers
property.
-
- Output datasets
-
Represents the target databases or collections where the sink connector writes data.
The following principles apply when defining output dataset mappings:
-
Each target database table or collection represents an output dataset.
-
The output dataset specifies the schema information for the target database.
-
The namespace format depends on the target database system. For more information, see Dataset namespace formatting.
-
The following Debezium sink connectors support OpenLineage integration in Kafka Connect:
- MongoDB Sink Connector
-
Writes CDC events to MongoDB collections.
- JDBC Sink Connector
-
Writes CDC events to relational database tables.
Debezium Server currently only supports OpenLineage integration with the Kafka sink. MongoDB Sink and JDBC Sink connectors are only supported in Kafka Connect deployments. |
When you integrate Debezium with OpenLineage, the connector emits events to report changes of status. The connector emits OpenLineage run events after the following status changes:
- START
-
Reports connector initialization.
- RUNNING
-
Emitted periodically during normal streaming operations and during processing individual tables. These periodic events ensure continuous lineage tracking for long-running streaming CDC operations.
- COMPLETE
-
Reports that the connector shut down gracefully.
- FAIL
-
Reports that the connector encountered an error.
Required Dependencies
The OpenLineage integration requires several JAR files that are bundled together in the debezium-openlineage-core-libs
archive.
Kafka Connect
Before you can use Debezium with OpenLineage in Kafka Connect, complete the following steps to obtain the required dependencies:
-
Download the OpenLineage core archive.
-
Extract the contents of the archive into the Debezium plug-in directories in your Kafka Connect environment.
Debezium Server
Before you can use Debezium Server with OpenLineage, complete the following steps to obtain the required dependencies:
-
Download the OpenLineage core archive.
-
Extract the contents of the archive.
-
Copy all JAR files to the
/debezium/lib
directory in your Debezium Server installation.
Configuring the integration
To enable the integration, you must configure the Debezium connector and the OpenLineage client. The configuration approach differs between Kafka Connect and Debezium Server deployments.
Kafka Connect configuration
To enable Debezium to integrate with OpenLineage in Kafka Connect, add properties to your connector configuration, as shown in the following example:
# Enable OpenLineage integration
openlineage.integration.enabled=true
# Path to OpenLineage configuration file
openlineage.integration.config.file.path=/path/to/openlineage.yml
# Job metadata (optional but recommended)
openlineage.integration.job.namespace=myNamespace
openlineage.integration.job.description=CDC connector for products database
openlineage.integration.job.tags=env=prod,team=data-engineering
openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer
Debezium Server configuration
To enable Debezium Server to integrate with OpenLineage, add OpenLineage properties to the application.properties
file, as shown in the following example.
OpenLineage properties use the debezium.source.
prefix
# Enable OpenLineage integration
debezium.source.openlineage.integration.enabled=true
# Path to OpenLineage configuration file
debezium.source.openlineage.integration.config.file.path=config/openlineage.yml
# Job metadata (optional but recommended)
debezium.source.openlineage.integration.job.description=CDC connector for products database
debezium.source.openlineage.integration.job.tags=env=prod,team=data-engineering
debezium.source.openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer
Configuring the OpenLineage client
Create an openlineage.yml
file to configure the OpenLineage client.
The openlineage.yml
configuration file is used in both Kafka Connect and Debezium Server deployments.
Use the following example as a guide:
transport:
type: http
url: http://your-openlineage-server:5000
endpoint: /api/v1/lineage
auth:
type: api_key
api_key: your-api-key
# Alternative: Console transport for testing
# transport:
# type: console
For detailed OpenLineage client configuration options, refer to the OpenLineage client documentation.
Debezium OpenLineage configuration properties
The following table lists the OpenLineage configuration properties for both deployment types.
For Debezium Server, add the debezium.source. prefix to all property names (for example, debezium.source.openlineage.integration.enabled ).
|
Property (Kafka Connect) | Description | Required | Default |
---|---|---|---|
|
Enables and disables the OpenLineage integration. |
Yes |
|
|
Path to the OpenLineage YAML configuration file. |
Yes |
No default value |
|
Namespace used for the job. |
No |
Value of |
|
Human-readable job description |
No |
No default value |
|
Comma-separated list of key-value tags. |
No |
No default value |
|
Comma-separated list of name-role ownership entries. |
No |
No default value |
|
Kafka bootstrap servers used to retrieve Kafka topic metadata.
For source connectors, if you do not specify a value, the value of For sink connectors, you must specify a value for this property. |
Yes (for sink connectors) |
Value of |
Specify Tags as a comma-separated list of key-value pairs, as shown in the following example:
openlineage.integration.job.tags=environment=production,team=data-platform,criticality=high
Specify Owners as a comma-separated list of name-role pairs, as shown in the following example:
openlineage.integration.job.owners=John Doe=maintainer,Jane Smith=Data Engineer,Team Lead=owner
Output dataset lineage
Debezium can capture output dataset lineage (Kafka topics) to track the destination of CDC events. The configuration approach differs between Kafka Connect and Debezium Server.
Kafka Connect output dataset lineage
To capture output dataset lineage in Kafka Connect, configure Debezium to use the OpenLineage Single Message Transform (SMT):
# Add OpenLineage transform
transforms=openlineage
transforms.openlineage.type=io.debezium.transforms.openlineage.OpenLineage
# Required: Configure schema history with Kafka bootstrap servers
schema.history.internal.kafka.bootstrap.servers=your-kafka:9092
The SMT captures detailed schema information about change events that Debezium writes to Kafka topics. The transformation captures schema data that includes the following items:
-
Event structure (before, after, source, transaction metadata)
-
Field types and nested structures
-
Topic names and namespaces
Complete configuration examples
The following examples show complete configurations for enabling OpenLineage integration in both Kafka Connect and Debezium Server.
Kafka Connect complete configuration example
The following example shows a complete configuration for enabling a PostgreSQL connector to integrate with OpenLineage in Kafka Connect:
{
"name": "inventory-connector-postgres",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"topic.prefix": "inventory",
"snapshot.mode": "initial",
"slot.name": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"openlineage.integration.enabled": "true",
"openlineage.integration.config.file.path": "/kafka/openlineage.yml",
"openlineage.integration.job.description": "CDC connector for inventory database",
"openlineage.integration.job.tags": "env=production,team=data-platform,database=postgresql",
"openlineage.integration.job.owners": "Data Team=maintainer,Alice Johnson=Data Engineer",
"transforms": "openlineage",
"transforms.openlineage.type": "io.debezium.transforms.openlineage.OpenLineage"
}
}
Debezium Server complete configuration example
The following example shows a complete application.properties
configuration for enabling a PostgreSQL connector to integrate with OpenLineage in Debezium Server with a Kafka sink:
# Sink configuration (Kafka)
debezium.sink.type=kafka
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.bootstrap.servers=kafka:9092
# Source connector configuration
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=inventory
# OpenLineage integration
debezium.source.openlineage.integration.enabled=true
debezium.source.openlineage.integration.config.file.path=config/openlineage.yml
debezium.source.openlineage.integration.job.description=CDC connector for products database
debezium.source.openlineage.integration.job.tags=env=prod,team=cdc
debezium.source.openlineage.integration.job.owners=Mario=maintainer,John Doe=Data scientist
# Logging configuration (optional)
quarkus.log.console.json=false
MongoDB sink connector configuration example
The following example shows a complete configuration for enabling the MongoDB sink connector to integrate with OpenLineage in Kafka Connect:
{
"name": "mongodb-sink",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbSinkConnector",
"tasks.max": "1",
"mongodb.connection.string": "mongodb://admin:admin@mongodb:27017",
"topics": "inventory.inventory.products",
"sink.database": "inventory2",
"openlineage.integration.enabled": "true",
"openlineage.integration.config.file.path": "/kafka/openlineage.yml",
"openlineage.integration.job.description": "Sink connector for MongoDB",
"openlineage.integration.job.tags": "env=prod,team=cdc",
"openlineage.integration.job.owners": "Mario=maintainer,John Doe=Data scientist",
"openlineage.integration.dataset.kafka.bootstrap.servers": "kafka:9092"
}
}
For sink connectors, the openlineage.integration.dataset.kafka.bootstrap.servers property is required to retrieve input dataset metadata from Kafka topics.
Unlike source connectors, sink connectors do not have direct access to Kafka topic metadata through the Kafka Connect framework and must explicitly connect to retrieve schema information.
|
JDBC sink connector configuration example
The following example shows a complete configuration for enabling the JDBC sink connector to integrate with OpenLineage in Kafka Connect:
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://postgres:5432/inventory",
"connection.username": "postgres",
"connection.password": "postgres",
"topics": "inventory.inventory.customers",
"insert.mode": "upsert",
"primary.key.mode": "record_key",
"openlineage.integration.enabled": "true",
"openlineage.integration.config.file.path": "/kafka/openlineage.yml",
"openlineage.integration.job.description": "Sink connector for JDBC",
"openlineage.integration.job.tags": "env=prod,team=data-engineering",
"openlineage.integration.job.owners": "Data Team=maintainer,Alice Johnson=Data Engineer",
"openlineage.integration.dataset.kafka.bootstrap.servers": "kafka:9092"
}
}
Dataset namespace formatting
Debezium formats dataset namespaces according to the OpenLineage dataset naming specification.
Input dataset namespaces
Input dataset namespaces identify the source database and follow a format specific to each database system.
-
Namespace:
postgres://hostname:port
-
Name:
schema.table
-
Schema: Column names and types from the source table
-
Namespace:
kafka://kafka-broker:9092
-
Name:
inventory.inventory.products
-
Schema: CDC event structure from the source connector
The exact namespace format depends on your database system and follows the OpenLineage specification for dataset naming.
Output dataset namespaces for source connectors
Output dataset namespaces identify the Kafka topics where CDC events are written.
-
Namespace:
kafka://bootstrap-server:port
-
Name:
topic-prefix.schema.table
-
Schema: Complete CDC event structure including metadata fields
Output dataset namespaces for sink connectors
Output dataset namespaces identify the target databases where sink connectors write data.
-
Namespace:
mongodb://mongodb-host:27017
-
Name:
database.collection
-
Schema: Target collection schema
-
Namespace:
postgres://postgres-host:5432
-
Name:
schema.table
-
Schema: Target table schema
Monitoring and Troubleshooting
To verify that the OpenLineage integration is working correctly, complete the following steps:
-
Check the connector logs for OpenLineage-related messages.
-
If you configured HTTP transport, verify that events appear in your OpenLineage backend.
-
For testing purposes, you can configure console transport to view events directly in the logs, as shown in the following example:
transport: type: console
- Integration not working
-
-
Verify that
openlineage.integration.enabled
is set totrue
. -
Check that the path to the OpenLineage configuration file that is specified in the connector configuration is correct, and that Debezium can access the target file.
-
Ensure that the YAML in the OpenLineage configuration file is valid.
-
Verify that all required JAR dependencies are present in the classpath.
-
- Missing output datasets
-
-
Verify that you configured the connector to use the OpenLineage transformation.
-
Check that you set the property
schema.history.internal.kafka.bootstrap.servers
in the connector configuration.
-
- Connection issues
-
-
Verify that you specified the correct server URL and authentication information in the OpenLineage client configuration.
-
Check the network connectivity between Debezium and the OpenLineage server.
-
- Dependency issues
-
-
Ensure that all required JAR files are present and their versions are compatible versions.
-
Check for classpath conflicts with existing dependencies.
-
- Missing input datasets for sink connectors
-
-
Verify that the
openlineage.integration.dataset.kafka.bootstrap.servers
property is configured. -
Verify that the connector has access to the Kafka bootstrap servers.
-
Verify that the Kafka topics specified in the
topics
configuration exist and that the connector has access to them.
-
When the connector fails, check for the following items in OpenLineage FAIL events:
-
Error messages
-
Stack traces
-
Connector configuration for debugging