You are viewing documentation for an unreleased version of Debezium.
If you want to view the latest stable version of this page, please go here.

Debezium sink connector for MongoDB

Overview

The Debezium MongoDB sink connector captures change event records from Apache Kafka topics and then transforms the records into MongoDB documents that it writes to collections in a specified MongoDB sink database. For applications that require high scalability and fast data retrieval, propagating change data to a cluster-based MongoDB environment, which uses such features as sharding and replica sets to optimize read operations, can significantly improve retrieval performance. The connector can process only change events that originate from a Debezium relational database connector.

For information about the MongoDB versions that are compatible with this connector, see the Debezium release overview.

Architecture and how it works

Use the Debezium MongoDB sink connector to stream change data capture (CDC) event records from Kafka topics to a MongoDB sink database. The connector subscribes to Kafka topics that are populated with event messages produced by Debezium relational database source connectors. Each event message describes a database operation (insert, update, or delete) in a structured format that captures the details of the event. The connector transforms incoming change event records into MongoDB document format, and then writes the resulting documents into the target MongoDB collection.

After it receives an event, the connector parses the event payload and determines which MongoDB collection to send it to. Depending on the event type that is specified in the event payload, the connector then performs one of the following operations in the target collection:

Event type in payload Resulting operation

INSERT

Create document

UPDATE

Modify document with the specified identifier.

DELETE

Remove document with the specified identifier.

The connector uses the MongoDB Java driver to interact with the MongoDB database.

The mapping between topics and MongoDB collections is derived from the connector configuration. The document key serves as a unique identifier for the document, ensuring that updates, inserts, and deletions are propagated to the correct MongoDB document and collection, and that operations are applied in the correct order.

Through this process of mapping event messages to MongoDB documents, the connector is able to mirror the state of tables in your relational database to collections in a MongoDB database.

Limitations

The Debezium MongoDB sink connector has the following limitations:

Relational database / RDBMS source connectors only

The MongoDB sink connector can consume only change events that originate from the Debezium connectors for the following relational databases:

  • MariaDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

The connector cannot process change event messages from any other Debezium connectors, including the Debezium MongoDB source connector.

Schema evolution

Although the connector can process basic schema changes, advanced schema evolution scenarios might require manual intervention or specific configuration. Because MongoDB is schemaless, it has only a limited ability to handle schema evolution.

Transaction Support

The connector processes individual change events in chronological order, based on the order in which operations are committed in the source system. Although MongoDB supports transactions, the Debezium MongoDB connector does not provide transactional guarantees across multiple CDC events or across multiple documents within a single sink task.

Quick Start (using Kafka Connect)

Deploy a basic instance of the MongoDB sink connector for testing.

Prerequisites

The following components are available and running in your environment:

  • Kafka cluster

  • Kafka Connect

  • A MongoDB instance.

  • Debezium relational database connector

  • Debezium MongoDB connector

Procedure
  1. Configure and start a Debezium source connector, for example, a Debezium PostgreSQL Connector, to stream changes from a relational database to Kafka.

  2. Configure and start the Debezium MongoDB sink connector to consume events that the source connector emits to Kafka, and send them to a MongoDB sink database.

    The following example provides a minimal configuration for a Debezium MongoDB sink connector. Replace the placeholders in the example with the actual values for your environment.

    {
      "name": "mongodb-sink-connector",
      "config": {
        "connector.class": "io.debezium.connector.mongodb.sink.MongoDbSinkConnector",
        "topics.regex": "server1\.inventory\..*",
        "mongodb.connection.string": "mongodb://localhost:27017",
        "sink.database": "debezium"
      }
    }

Configuration

The MongoDB sink connector accepts a variety of configuration options, as described in following tables.

Table 1. Required Kafka Connect sink connector configuration properties
Property Default Description

connector.class

No default value

Must be set to io.debezium.connector.mongodb.sink.MongoDbSinkConnector.

tasks.max

1

Maximum number of tasks.

topics or topics.regex

No default value

List of Kafka topics to consume from. If you set this value to topics.regex, the connector consumes from all topics that match the regular expression.

Table 2. Required MongoDB connection properties
Property Default Description

No default value

MongoDB connection string (URI) that the sink uses to connect to MongoDB. This URI follows the standard MongoDB connection string format.

Example: mongodb://localhost:27017/?replicaSet=my-replica-set

No default value

Name of the target MongoDB database.

Table 3. Sink behavior configuration
Property Default Description

io.debezium.sink.naming.DefaultCollectionNamingStrategy

Specifies the strategy that the connector uses to derive the name of the target MongoDB collection from the name of the Kafka topic.

Specify one of the following values:

io.debezium.sink.naming.DefaultCollectionNamingStrategy

The connector takes the table name directly from the topic name, replacing dot (period) characters in the name of the source topic with underscores.

Custom implementation

You can provide your own CollectionNameStrategy implementation.

${topic}

Template for deriving the target collection name from the Kafka topic name.

io.debezium.sink.naming.DefaultColumnNamingStrategy

Specifies the strategy that the connector uses to name columns in the target collection.

Specify one of the following values:

io.debezium.sink.naming.DefaultColumnNamingStrategy

Use the original field name as the column name.

Custom implementation

Specify a custom CollectionNameStrategy implementation.

Table 4. Common sink options
Property Default Description

empty string

An optional, comma-separated list of field names that match the fully-qualified names of fields to include from the change event value. Fully-qualified names for fields are of the form fieldName or topicName:fieldName.
If you include this property in the configuration, do not set the field.exclude.list property.

empty string

An optional, comma-separated list of field names that match the fully-qualified names of fields to exclude from the change event value. Fully-qualified names for fields are of the form fieldName or topicName:fieldName.

If you include this property in the configuration, do not set the field.include.list property.

2048

Maximum number of records to write in a single batch.

Example configuration

The following example shows how you might configure the connector to read change events from three specific topics from the dbserver1.inventory database to modify a collection in the MongoDB sink database named debezium.

{
    "name": "mongodb-sink-connector",
    "config": {
        "connector.class": "io.debezium.connector.mongodb.sink.MongoDbSinkConnector",
        "topics": "dbserver1.inventory.customers,dbserver1.inventory.orders,dbserver1.inventory.products",
        "mongodb.connection.string": "mongodb://localhost:27017",
        "sink.database": "debezium"
    }
}

Monitoring

This release of the connector does not expose any metrics.

Key field mapping

When the connector processes events, it maps data to specific fields in the target MongoDB document.

  • Keys from Debezium change events, such as Kafka message keys, are mapped to the MongoDB _id field by default.

  • Values are mapped into MongoDB documents.

  • Updates and deletes are resolved based on the key field mapping.

The following example shows an event key in a Kafka topic:

{
    "userId": 1,
    "orderId": 1
}

Based on the mapping logic, the preceding key is mapped to the _id field in a MongoDB document, as showin in the following example:

{
    "_id": {
        "userId": 1,
        "orderId": 1
    }
}

Using CloudEvents with Debezium MongoDB Sink Connector

The Debezium MongoDB sink connector can consume records serialized as CloudEvents. Debezium can emit change events in CloudEvents format, so that the event payload is encapsulated in a standardized envelope.

When you enable CloudEvents on the source connector, the MongoDB sink connector parses the CloudEvents envelope.

The actual Debezium event payload is extracted from the data section.

The event is then applied to the target MongoDB collection, following the standard insert, update, or delete semantics.

This process makes it possible to integrate Debezium with broader event-driven systems while still persisting the resulting events in MongoDB.

Table 5. CloudEvents sink options
Property Default Description

.*CloudEvents\.Envelope$

Regular expression pattern to identify CloudEvents messages by matching the schema name with this pattern.