OpenLineage Integration
- About Data Lineage and OpenLineage
- How Debezium integrates with OpenLineage
- Required Dependencies
- Obtaining Dependencies
- Configuring the integration
- Configuring connectors with a basic OpenLineage configuration
- Configuring the OpenLineage client
- Debezium OpenLineage configuration properties
- Source connectors output dataset lineage
- Example: Complete connector configuration for enabling OpenLineage integration
- Lineage Events
- Dataset information
- Monitoring and Troubleshooting
Debezium provides built-in integration with OpenLineage to automatically track data lineage for Change Data Capture (CDC) operations. The OpenLineage integration provides you with comprehensive visibility into the data flow and transformations that you use in your data pipeline.
About Data Lineage and OpenLineage
Data lineage tracks the flow of data through various systems, transformations, and processes. This information provides you with visibility into where data originates, how it moves, and what dependencies exist in the data pipeline. Insights into data lineage is crucial for the following activities:
-
Data governance and compliance
-
Impact analysis when making changes
-
Debugging data quality issues
-
Understanding data dependencies
OpenLineage is an open standard for data lineage that provides a unified way to collect and track lineage metadata across multiple data systems. The specification defines a common model for describing datasets, jobs, and runs, simplifying the process of building comprehensive lineage graphs across heterogeneous data infrastructures.
For more information, see the OpenLineage website and documentation.
How Debezium integrates with OpenLineage
To integrate with OpenLineage, Debezium maps events in its lifecycle to artifacts in the OpenLineage data model.
The Debezium connector is mapped to an OpenLineage Job, which includes the following elements:
- Name
-
A job inherits its name from the Debezium
topic.prefix.<taskId>
. - Namespace
-
Inherited from
openlineage.integration.job.namespace
, if specified; otherwise defaults to the value of thetopic.prefix
. - Debezium connector version
- Complete connector configuration
- Job metadata
-
Description, tags, and owners.
The following dataset mappings are possible:
- Input Datasets
-
Represents the database tables that Debezium is configured to capture changes from. The OpenLineage integration automatically creates input datasets based on the connector configuration. The integration applies the following principles when it creates dataset mappings:
-
Each table that the connector monitors becomes an input dataset.
-
Each dataset captures schema information for the corresponding source table, including the name and data type of each column.
-
DDL changes in the source table are reflected dynamically in the dataset schema.
-
- Output datasets
-
Represents the Kafka topics that result after you apply the OpenLineage single message transformation (SMT):. Output dataset mappings are created according to the following principles:
-
Each Kafka topic that the connector produces becomes an output dataset.
-
The output dataset captures the complete CDC event structure, including metadata fields.
-
The name of the dataset is based on the connector’s topic prefix configuration.
-
Support for sink connectors is not yet available.
When you integrate Debezium with OpenLineage, the connector emits events to report changes of status. The connector emits OpenLineage run events after the following status changes:
- START
-
Reports connector initialization.
- RUNNING
-
Emitted periodically during normal streaming operations and during processing individual tables. These periodic events ensure continuous lineage tracking for long-running streaming CDC operations.
- COMPLETE
-
Reports that the connector shut down gracefully.
- FAIL
-
Reports that the connector encountered an error.
Required Dependencies
The OpenLineage integration requires several JAR files that are not bundled with Debezium. To use Debezium with OpenLineage, you must download the JAR files for the following dependencies and add them to your Debezium installation:
openlineage-java-<version>.jar
|
Core OpenLineage Java client |
commons-codec-<version>.jar
|
Apache Commons Codec utilities |
httpclient5-<version>.jar
|
Apache HTTP client for sending lineage events |
httpcore5-<version>.jar
|
Apache HTTP core components |
httpcore5-h2-<version>.jar
|
HTTP/2 support for Apache HTTP core |
jackson-dataformat-yaml-<version>.jar
|
YAML parsing support |
jackson-datatype-jsr310-<version>.jar
|
Java 8 time API support for Jackson |
micrometer-commons-<version>.jar
|
Micrometer metrics commons |
micrometer-core-<version>.jar
|
Micrometer metrics core |
snakeyaml-<version>.jar
|
YAML parser |
Obtaining Dependencies
The dependencies that are required to support the integration might be available in multiple versions. To install the dependencies, determine the version that is required and then download the JAR file.
-
On Maven Central, open the
openlineage-java
repository and identify the version used by Debezium. -
View the dependency tree for the
openlineage-java
version and identify the exact versions of all transitive dependencies. -
Download the required JAR files and place them in the classpath for your Debezium connector.
Dependency versions must be compatible with each other.
Always refer to the Maven dependency tree of the specific |
Configuring the integration
To enable the integration, you must configure the Debezium connector and the OpenLineage client.
Configuring connectors with a basic OpenLineage configuration
To enable Debezium to integrate with OpenLineage, add properties to your connector configuration, as shown in the following example:
# Enable OpenLineage integration
openlineage.integration.enabled=true
# Path to OpenLineage configuration file
openlineage.integration.config.file.path=/path/to/openlineage.yml
# Job metadata (optional but recommended)
openlineage.integration.job.namespace=myNamespace
openlineage.integration.job.description=CDC connector for products database
openlineage.integration.job.tags=env=prod,team=data-engineering
openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer
Configuring the OpenLineage client
Create an openlineage.yml
file to configure the OpenLineage client.
Use the following example as a guide:
transport:
type: http
url: http://your-openlineage-server:5000
endpoint: /api/v1/lineage
auth:
type: api_key
api_key: your-api-key
# Alternative: Console transport for testing
# transport:
# type: console
For detailed OpenLineage client configuration options, refer to the OpenLineage client documentation.
Debezium OpenLineage configuration properties
Property | Description | Required | Default |
---|---|---|---|
|
Enables and disables the OpenLineage integration. |
Yes |
|
|
Path to the OpenLineage YAML configuration file. |
Yes |
No default value |
|
Namespace used for the job. |
Value from |
Value of |
|
Human-readable job description |
No |
No default value |
|
Comma-separated list of key-value tags. |
No |
No default value |
|
Comma-separated list of name-role ownership entries. |
No |
No default value |
Specify Tags as a comma-separated list of key-value pairs, as shown in the following example:
openlineage.integration.job.tags=environment=production,team=data-platform,criticality=high
Specify Owners as a comma-separated list of name-role pairs, as shown in the following example:
openlineage.integration.job.owners=John Doe=maintainer,Jane Smith=Data Engineer,Team Lead=owner
Source connectors output dataset lineage
To capture output dataset lineage (Kafka topics), configure Debezium to use the OpenLineage Single Message Transform (SMT):
# Add OpenLineage transform
transforms=openlineage
transforms.openlineage.type=io.debezium.transforms.openlineage.OpenLineage
# Required: Configure schema history with Kafka bootstrap servers
schema.history.internal.kafka.bootstrap.servers=your-kafka:9092
The SMT captures detailed schema information about change events that Debezium writes to Kafka topics. The transformation captures schema data that includes the following items:
-
Event structure (before, after, source, transaction metadata)
-
Field types and nested structures
-
Topic names and namespaces
Example: Complete connector configuration for enabling OpenLineage integration
The following example shows a possible complete configuration for enabling a PostgreSQL connector to integrate with OpenLineage:
# Connector basics
name=products-cdc-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=localhost
database.port=5432
database.user=debezium
database.password=debezium
database.dbname=inventory
topic.prefix=inventory
# Snapshot configuration
snapshot.mode=initial
slot.drop.on.stop=false
# OpenLineage integration
openlineage.integration.enabled=true
openlineage.integration.config.file.path=/opt/debezium/config/openlineage.yml
openlineage.integration.job.description=CDC connector for inventory database
openlineage.integration.job.tags=env=production,team=data-platform,database=postgresql
openlineage.integration.job.owners=Data Team=maintainer,Alice Johnson=Data Engineer
# For output lineage (optional)
transforms=openlineage
transforms.openlineage.type=io.debezium.transforms.openlineage.OpenLineage
schema.history.internal.kafka.bootstrap.servers=kafka:9092
# Standard Kafka Connect settings
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
Lineage Events
The integration produces several types of OpenLineage events:
- START
-
Reports connector initialization.
- RUNNING
-
Reports that the connector is operating normally and is processing tables.
- COMPLETE
-
Reports that the connector shut down gracefully.
- FAIL
-
Reports that the connector encountered an error.
Dataset information
Input Datasets represent source database tables. The namespace follows the OpenLineage dataset naming specification.
The following example shows the dataset naming for a table in a PostgreSQL database:
-
Namespace:
postgres://hostname:port
-
Name:
schema.table
-
Schema: Column names and types from the source table
The exact namespace format depends on your database system and follows the OpenLineage specification for dataset naming.
Output datasets represent the Kafka topics that result after you apply the OpenLineage transformation.
An output dataset includes the following information about the Kafka topic:
Namespace |
|
Name |
|
Schema |
Complete CDC event structure including metadata fields |
Monitoring and Troubleshooting
You can perform several tasks to verify that the integration is working as expected.
-
Check the connector logs for messages that refer to OpenLineage.
-
Verify events in your OpenLineage backend. This applies only if you use HTTP transport.
-
Use console transport for testing, as shown in the following example:
transport: type: console
- Integration not working
-
-
Verify that
openlineage.integration.enabled
is set totrue
. -
Check that the path to the OpenLineage configuration file that is specified in the connector configuration is correct, and that Debezium can access the target file.
-
Ensure that the YAML in the OpenLineage configuration file is valid.
-
Verify that all required JAR dependencies are present in the classpath.
-
- Missing output datasets
-
-
Verify that you configured the connector to use the OpenLineage transformation.
-
Check that you set the property
schema.history.internal.kafka.bootstrap.servers
in the connector configuration.
-
- Connection issues
-
-
Verify that you specified the correct server URL and authentication information in the OpenLineage client configuration.
-
Check the network connectivity between Debezium and the OpenLineage server.
-
- Dependency issues
-
-
Ensure that all required JAR files are present and their versions are compatible versions.
-
Check for classpath conflicts with existing dependencies.
-
When the connector fails, check for the following items in OpenLineage FAIL events:
-
Error messages
-
Stack traces
-
Connector configuration for debugging