Integration Testing with Testcontainers

This feature is currently in incubating state, i.e. exact semantics, configuration options, APIs etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems will using this extension.

Overview

When setting up change data capture pipelines with Debezium, it’s a good idea to also have some automated testing in place, in order to make sure that

  • the source database is set up so changes can be streamed off of it

  • your connectors are configured correctly

The Debezium extension for Testcontainers aims at simplying such tests, by running all the required infrastructure (Apache Kafka, Kafka Connect etc.) via Linux containers and making it easily accessible to Java-based tests.

It applies sensible defaults as far as possible (e.g. database credentials for connectors can be obtained from the configured database container), allowing you to focus on the essential logic of your tests.

Getting Started

In order to use Debezium’s Testcontainers integration, add the following dependency to your project:

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-testing-testcontainers</artifactId>
  <version>3.0.6.Final</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <scope>test</scope>
</dependency>

<!-- Add the TC dependency matching your database -->
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>postgresql</artifactId>
  <scope>test</scope>
</dependency>

Depending on your testing strategy, you may also need the JDBC driver of your database and a client for Apache Kafka, so you can insert some test data and assert the corresponding change events in Kafka.

Test Set-Up

When writing an integration test for a Debezium connector configuration, you’ll also need to set up Apache Kafka and a database which should be the source of change events. The existing Testcontainers support for Apache Kafka and databases can be used for that.

Together with Debezium’s DebeziumContainer class, a typical set-up will look like this:

public class DebeziumContainerTest {

    private static Network network = Network.newNetwork(); (1)

    private static KafkaContainer kafkaContainer = new KafkaContainer()
            .withNetwork(network); (2)

    public static PostgreSQLContainer<?> postgresContainer =
            new PostgreSQLContainer<>(
                    DockerImageName.parse("quay.io/debezium/postgres:15")
                        .asCompatibleSubstituteFor("postgres"))
                .withNetwork(network)
                .withNetworkAliases("postgres"); (3)

    public static DebeziumContainer debeziumContainer =
            new DebeziumContainer("quay.io/debezium/connect:3.0.6.Final")
                .withNetwork(network)
                .withKafka(kafkaContainer)
                .dependsOn(kafkaContainer); (4)

    @BeforeClass
    public static void startContainers() { (5)
        Startables.deepStart(Stream.of(
                kafkaContainer, postgresContainer, debeziumContainer))
                .join();
    }
}
1 Define a Docker network to be used by all the services
2 Set up a container for Apache Kafka
3 Set up a container for Postgres 15 (using Debezium’s Postgres container image)
4 Set up a container for Kafka Connect with Debezium 3.0.6.Final
5 Start all three containers

Test Implementation

Having declared all the required containers, you can now register an instance of the Debezium Postgres connector, insert some test data into Postgres and use the Apache Kafka client for reading the expected change event records from the corresponding Kafka topic:

@Test
public void canRegisterPostgreSqlConnector() throws Exception {
    try (Connection connection = getConnection(postgresContainer);
            Statement statement = connection.createStatement();
            KafkaConsumer<String, String> consumer = getConsumer(
                    kafkaContainer)) {

        statement.execute("create schema todo"); (1)
        statement.execute("create table todo.Todo (id int8 not null, " +
                "title varchar(255), primary key (id))");
        statement.execute("alter table todo.Todo replica identity full");
        statement.execute("insert into todo.Todo values (1, " +
                "'Learn CDC')");
        statement.execute("insert into todo.Todo values (2, " +
                "'Learn Debezium')");

        ConnectorConfiguration connector = ConnectorConfiguration
                .forJdbcContainer(postgresContainer)
                .with("topic.prefix", "dbserver1");

        debeziumContainer.registerConnector("my-connector",
                connector); (2)

        consumer.subscribe(Arrays.asList("dbserver1.todo.todo"));

        List<ConsumerRecord<String, String>> changeEvents =
                drain(consumer, 2); (3)

        assertThat(JsonPath.<Integer> read(changeEvents.get(0).key(),
                "$.id")).isEqualTo(1);
        assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
                "$.op")).isEqualTo("r");
        assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
                "$.after.title")).isEqualTo("Learn CDC");

        assertThat(JsonPath.<Integer> read(changeEvents.get(1).key(),
                "$.id")).isEqualTo(2);
        assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
                "$.op")).isEqualTo("r");
        assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
                "$.after.title")).isEqualTo("Learn Debezium");

        consumer.unsubscribe();
    }
}

// Helper methods below

private Connection getConnection(
        PostgreSQLContainer<?> postgresContainer)
                throws SQLException {

    return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
            postgresContainer.getUsername(),
            postgresContainer.getPassword());
}

private KafkaConsumer<String, String> getConsumer(
            KafkaContainer kafkaContainer) {

    return new KafkaConsumer<>(
            Map.of(
                    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                            kafkaContainer.getBootstrapServers(),
                    ConsumerConfig.GROUP_ID_CONFIG,
                            "tc-" + UUID.randomUUID(),
                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                            "earliest"),
            new StringDeserializer(),
            new StringDeserializer());
}

private List<ConsumerRecord<String, String>> drain(
        KafkaConsumer<String, String> consumer,
        int expectedRecordCount) {

    List<ConsumerRecord<String, String>> allRecords = new ArrayList<>();

    Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
        consumer.poll(Duration.ofMillis(50))
                .iterator()
                .forEachRemaining(allRecords::add);

        return allRecords.size() == expectedRecordCount;
    });

    return allRecords;
}
1 Create a table in the Postgres database and insert two records
2 Register an instance of the Debezium Postgres connector; the connector type as well as properties such as database host, database name, user etc. are derived from the database container
3 Read two records from the change event topic in Kafka and assert their attributes