Debezium sink connector for MongoDB
Overview
The Debezium MongoDB sink connector captures change event records from Apache Kafka topics and then transforms the records into MongoDB documents that it writes to collections in a specified MongoDB sink database. For applications that require high scalability and fast data retrieval, propagating change data to a cluster-based MongoDB environment, which uses such features as sharding and replica sets to optimize read operations, can significantly improve retrieval performance. The connector can process only change events that originate from a Debezium relational database connector.
For information about the MongoDB versions that are compatible with this connector, see the Debezium release overview.
Architecture and how it works
Use the Debezium MongoDB sink connector to stream change data capture (CDC) event records from Kafka topics to a MongoDB sink database. The connector subscribes to Kafka topics that are populated with event messages produced by Debezium relational database source connectors. Each event message describes a database operation (insert, update, or delete) in a structured format that captures the details of the event. The connector transforms incoming change event records into MongoDB document format, and then writes the resulting documents into the target MongoDB collection.
After it receives an event, the connector parses the event payload and determines which MongoDB collection to send it to. Depending on the event type that is specified in the event payload, the connector then performs one of the following operations in the target collection:
Event type in payload | Resulting operation |
---|---|
INSERT |
Create document |
UPDATE |
Modify document with the specified identifier. |
DELETE |
Remove document with the specified identifier. |
The connector uses the MongoDB Java driver to interact with the MongoDB database.
The mapping between topics and MongoDB collections is derived from the connector configuration. The document key serves as a unique identifier for the document, ensuring that updates, inserts, and deletions are propagated to the correct MongoDB document and collection, and that operations are applied in the correct order.
Through this process of mapping event messages to MongoDB documents, the connector is able to mirror the state of tables in your relational database to collections in a MongoDB database.
Limitations
The Debezium MongoDB sink connector has the following limitations:
- Relational database / RDBMS source connectors only
-
The MongoDB sink connector can consume only change events that originate from the Debezium connectors for the following relational databases:
-
MariaDB
-
MySQL
-
Oracle
-
PostgreSQL
-
SQL Server
The connector cannot process change event messages from any other Debezium connectors, including the Debezium MongoDB source connector.
-
- Schema evolution
-
Although the connector can process basic schema changes, advanced schema evolution scenarios might require manual intervention or specific configuration. Because MongoDB is schemaless, it has only a limited ability to handle schema evolution.
- Transaction Support
-
The connector processes individual change events in chronological order, based on the order in which operations are committed in the source system. Although MongoDB supports transactions, the Debezium MongoDB connector does not provide transactional guarantees across multiple CDC events or across multiple documents within a single sink task.
Quick Start (using Kafka Connect)
Deploy a basic instance of the MongoDB sink connector for testing.
The following components are available and running in your environment:
-
Kafka cluster
-
Kafka Connect
-
A MongoDB instance.
-
Debezium relational database connector
-
Debezium MongoDB connector
-
Configure and start a Debezium source connector, for example, a Debezium PostgreSQL Connector, to stream changes from a relational database to Kafka.
-
Configure and start the Debezium MongoDB sink connector to consume events that the source connector emits to Kafka, and send them to a MongoDB sink database.
The following example provides a minimal configuration for a Debezium MongoDB sink connector. Replace the placeholders in the example with the actual values for your environment.
{ "name": "mongodb-sink-connector", "config": { "connector.class": "io.debezium.connector.mongodb.sink.MongoDbSinkConnector", "topics.regex": "server1\.inventory\..*", "mongodb.connection.string": "mongodb://localhost:27017", "sink.database": "debezium" } }
Configuration
The MongoDB sink connector accepts a variety of configuration options, as described in following tables.
Property | Default | Description |
---|---|---|
|
No default value |
Must be set to |
|
1 |
Maximum number of tasks. |
|
No default value |
List of Kafka topics to consume from.
If you set this value to |
Property | Default | Description |
---|---|---|
No default value |
MongoDB connection string (URI) that the sink uses to connect to MongoDB. This URI follows the standard MongoDB connection string format. Example: |
|
No default value |
Name of the target MongoDB database. |
Property | Default | Description |
---|---|---|
|
Specifies the strategy that the connector uses to derive the name of the target MongoDB collection from the name of the Kafka topic. Specify one of the following values:
|
|
|
Template for deriving the target collection name from the Kafka topic name. |
|
|
Specifies the strategy that the connector uses to name columns in the target collection. Specify one of the following values:
|
Property | Default | Description |
---|---|---|
empty string |
An optional, comma-separated list of field names that match the fully-qualified names of fields to include from the change event value.
Fully-qualified names for fields are of the form |
|
empty string |
An optional, comma-separated list of field names that match the fully-qualified names of fields to exclude from the change event value.
Fully-qualified names for fields are of the form If you include this property in the configuration, do not set the |
|
2048 |
Maximum number of records to write in a single batch. |
Example configuration
The following example shows how you might configure the connector to read change events from three specific topics from the dbserver1.inventory
database to modify a collection in the MongoDB sink database named debezium
.
{
"name": "mongodb-sink-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.sink.MongoDbSinkConnector",
"topics": "dbserver1.inventory.customers,dbserver1.inventory.orders,dbserver1.inventory.products",
"mongodb.connection.string": "mongodb://localhost:27017",
"sink.database": "debezium"
}
}
Key field mapping
When the connector processes events, it maps data to specific fields in the target MongoDB document.
-
Keys from Debezium change events, such as Kafka message keys, are mapped to the MongoDB
_id
field by default. -
Values are mapped into MongoDB documents.
-
Updates and deletes are resolved based on the key field mapping.
The following example shows an event key in a Kafka topic:
{
"userId": 1,
"orderId": 1
}
Based on the mapping logic, the preceding key is mapped to the _id
field in a MongoDB document, as showin in the following example:
{
"_id": {
"userId": 1,
"orderId": 1
}
}
Using CloudEvents with Debezium MongoDB Sink Connector
The Debezium MongoDB sink connector can consume records serialized as CloudEvents. Debezium can emit change events in CloudEvents format, so that the event payload is encapsulated in a standardized envelope.
When you enable CloudEvents on the source connector, the MongoDB sink connector parses the CloudEvents envelope.
The actual Debezium event payload is extracted from the data section.
The event is then applied to the target MongoDB collection, following the standard insert, update, or delete semantics.
This process makes it possible to integrate Debezium with broader event-driven systems while still persisting the resulting events in MongoDB.
Property | Default | Description |
---|---|---|
|
Regular expression pattern to identify CloudEvents messages by matching the schema name with this pattern. |