Subscribe


Streaming Cassandra at WePay - Part 2

This post originally appeared on the WePay Engineering blog.

In the first half of this blog post series, we explained our decision-making process of designing a streaming data pipeline for Cassandra at WePay. In this post, we will break down the pipeline into three sections and discuss each of them in more detail:

  1. Cassandra to Kafka with CDC agent

  2. Kafka with BigQuery with KCBQ

  3. Transformation with BigQuery view

Cassandra to Kafka with CDC Agent

The Cassandra CDC agent is a JVM process that is intended to be deployed on each node in a Cassandra cluster. The agent is comprised of several interdependent processors, running concurrently and working together to publish change events to Kafka.

Snapshot Processor

This processor is responsible for bootstrapping new tables. It looks up the CDC configuration to determine the snapshot mode, and performs snapshot on CDC-enabled tables if needed. To snapshot a table, the agent performs a full table scan and converts each row in the result set into an individual create event, and then sequentially enqueues them to an in-memory BlockingQueue.

Commit Log Processor

This processor is responsible for watching the CDC directory for new commit logs, parsing the commit log files via Cassandra’s CommitLogReader, transforming deserialized mutations into standardized change events, and finally enqueuing them to the same queue as the snapshot processor.

At this point, some readers may have concerns in regard to running Snapshot Processor and Commit Log Processors concurrently rather than serially. The reason is that Cassandra uses a client-side timestamp to determine event order, and resolves conflicts with last write wins. This client-side timestamp is deliberately stored in each change event. This is why snapshotting doesn’t have to proceed commit log processing – the ordering is determined later on when the data is queried in the data warehouse.

Queue Processor

This processor is responsible for dequeuing change events, transforming them into Avro records, and sending them to Kafka via a Kafka producer. It also tracks the position of the most recently sent event, so that on restart it is able to pick up from where it left off.

Implementing an in-memory queue in the CDC agent seems like overkill at first. Given there is only a single thread doing the enqueue and another thread doing the dequeue, the performance boost is negligible. The motivation here is to decouple the work of parsing commit logs, which should be done serially in the right order, from the work of serializing and publishing Kafka events, which can be parallelized by multiple threads for different tables. Although such parallelization is not implemented at the moment, we want the flexibility of adding this feature in the near future.

Some may also wonder why Kafka Connect is not used here as it seems like a natural fit for streaming. It is a great option if we wanted distributed parallel processing with fault tolerance. However, it is more complicated to deploy, monitor, and debug than a Kafka producer. For the purpose of building a minimum viable infrastructure, we chose Kafka producer at the time.

Schema Processor

In order to support automatic schema evolution, this processor periodically polls the database for the latest table schema, and updates the in-memory schema cache if a change is detected. Snapshot Processor and Commit Log Processor both look up table schema from this cache and attach it as part of the change event prior to enqueue. Then upon dequeue, the Queue Processor transforms the attached table schema into an Avro schema for record serialization.

Commit Log Post Processor

This processor is responsible for cleaning up commit logs after they have been processed. The default Commit Log Post Processor implementation will simply perform deletion. A custom Commit Log Post Processor can be configured for use case such as archiving commit log files to S3 or GCS.

Kafka to BigQuery with KCBQ

Once the events arrive in Kafka, we use KCBQ to send the events data to BigQuery without performing special transformations, just like in our MySQL streaming data pipeline. We have written a previous blog post explaining this connector in more details.

Transformation with BigQuery View

Once the events are in BigQuery, this is where the heavy-lifting is being done. We create virtual views on top of the raw tables to merge the data in a way that mirrors the source table in Cassandra. Note that each row in the raw tables contains limited data – only columns that have been modified have states. This means selecting the latest row for each primary key will not provide us with data that is consistent with source. Instead, the query must identify the latest cell in each column for each primary key. This can be achieved with self-joins on the primary key for each column in the table. Although joins are slow in MySQL, BigQuery’s parallel execution engine and columnar storage makes this possible. A view on top of a 1TB Cassandra table in BigQuery takes about 100 seconds to query.

Compaction

The fact that the BigQuery view is virtual implies each time the view is queried essentially triggers a full compaction of the raw data. This means the cost will go up with the number of queries, not to mention the duplicated events amplifies the amount of data that needs to be processed by a factor of N, where N is the replication factor. To save cost and improve performance, periodic compaction by materializing the view is necessary.

Future Development Work

Support for Cassandra 4.0

In Cassandra 4.0, the improved CDC feature allows the connector to be able to parse events in real-time as they are written rather than in micro-batches on each commit log flush. This reduces latency substantially.

Performance Optimization

As mentioned earlier, there is a single thread responsible for dequeuing, serializing, and publishing Kafka records. However, as the write throughput increases, if the performance of the agent does not keep up, it would result in a backlog of unprocessed commit logs which could potentially impact the health of our production database. The next step is to leverage parallel processing of events to optimize performance.

Streamline with Debezium and Kafka Connect

We initially built the Cassandra CDC agent as a standalone project. Now that it is open-sourced as a Debezium connector, we can replace some of our custom classes with existing ones in Debezium. Another improvement is to support common features that all Debezium connectors have, such as support for multiple serialization formats. Finally, the CDC agent is not fault tolerant; robust alert and monitoring are required as part of deployment. One area to explore in the future is to build the CDC agent on top of Kafka Connect as a source connector, this further streamlines the Cassandra connector with other Debezium connectors, and provides scalability and fault tolerance for free.

Closing Remarks

Cassandra being a peer-to-peer distributed database poses some really interesting challenges for CDC that do not exist in relational databases like MySQL and Postgres, or even a single-master NoSQL database like MongoDB. Note that it is worth evaluating the limitations before rolling out your own real-time data pipeline for Cassandra.

Besides understanding Cassandra internals, we learned a few lessons on engineering productivity along the way:

Minimum Viable Product Philosophy

By stripping away all features except for the essentials, we were able to build, test, and deploy a working solution in a reasonable time with limited resources. Had we aimed to design a pipeline that encompasses all features upfront, it would have taken a lot longer and required much more resources.

Community Involvement

Cassandra is an open-source project. Rather than tackling the problem solo, we were engaged with the Cassandra community from the very start (i.e. sharing experiences with committers and users via meetups, discussing proposals in mailing list, presenting proof-of-concept in conferences, etc.); all of which provided us with valuable feedback throughout the design and implementation stages.


Streaming Cassandra at WePay - Part 1

This post originally appeared on the WePay Engineering blog.

Historically, MySQL had been the de-facto database of choice for microservices at WePay. As WePay scales, the sheer volume of data written into some of our microservice databases demanded us to make a scaling decision between sharded MySQL (i.e. Vitess) and switching to a natively sharded NoSQL database. After a series of evaluations, we picked Cassandra, a NoSQL database, primarily because of its high availability, horizontal scalability, and ability to handle high write throughput.

Batch ETL Options

After introducing Cassandra to our infrastructure, our next challenge was to figure out a way to expose data in Cassandra to BigQuery, our data warehouse, for analytics and reporting. We quickly built an Airflow hook and operator to execute full loads. This obviously doesn’t scale, as it rewrites the entire database on each load. To scale the pipeline, we evaluated two incremental load approaches, but both have their shortcomings:

  1. Range query. This is a common ETL approach where data is extracted via a range query at regular intervals, such as hourly or daily. Anyone familiar with Cassandra data modelling would quickly realize how unrealistic this approach is. Cassandra tables need to be modeled to optimize query patterns used in production. Adding this query pattern for analytics in most cases means cloning the table with different clustering keys. RDBMS folks might suggest secondary index to support this query pattern, but secondary index in Cassandra are local, therefore this approach would pose performance and scaling issues of its own.

  2. Process unmerged SSTables. SSTables are Cassandra’s immutable storage files. Cassandra offers a sstabledump CLI command that converts SSTable content into human-readable JSON. However, Cassandra is built on top of the concept of Log-Structured Merge (LSM) Tree, meaning SSTables merge periodically into new compacted files. Depending on the compaction strategy, detecting unmerged SSTable files out-of-band may be challenging (we later learned about the incremental backup feature in Cassandra which only backs up uncompacted SSTables; so this approach would have worked as well.)

Given these challenges, and having built and operated a streaming data pipeline for MySQL, we began to explore streaming options for Cassandra.

Streaming Options

Double-Writing

Image showing writer send two distinct writes

The idea is to publish to Kafka every time a write is performed on Cassandra. This double-writing could be performed via the built-in trigger or a custom wrapper around the client. There are performance problems with this approach. First, due to the fact that we now need to write to two systems instead of one, write latency is increased. More importantly, when a write to one system fails due to a timeout, whether the write is successful or not is indeterministic. To guarantee data consistency on both systems, we would have to implement distributed transactions, but multiple roundtrips for consensus will increase latency and reduce throughput further. This defeats the purpose of a high write-throughput database.

Kafka as Event Source

Image showing writes sent to Kafka and then downstream DB

The idea is to write to Kafka rather than directly writing to Cassandra; and then apply the writes to Cassandra by consuming events from Kafka. Event sourcing is a pretty popular approach these days. However, if you already have existing services directly writing to Cassandra, it would require a change in application code and a nontrivial migration. This approach also violates read-your-writes consistency: the requirement that if a process performs a write, then the same process performing a subsequent read must observe the write’s effects. Since writes are routed through Kafka, there will be a lag between when the write is issued and when it is applied; during this time, reads to Cassandra will result in stale data. This may cause unforeseeable production issues.

Parsing Commit Logs

Image showing commit logs sent to Kafka

Cassandra introduced a change data capture (CDC) feature in 3.0 to expose its commit logs. Commit logs are write-ahead logs in Cassandra designed to provide durability in case of machine crashes. They are typically discarded upon flush. With CDC enabled, they are instead transferred to a local CDC directory upon flush, which is then readable by other processes on the Cassandra node. This allows us to use the same CDC mechanism as in our MySQL streaming pipeline. It decouples production operations from analytics, and thus does not require additional work from application engineers.

Ultimately, after considering throughput, consistency, and separation of concerns, the final option – parsing commit logs – became the top contender.

Commit Log Deep Dive

Aside from exposing commit logs, Cassandra also provides CommitLogReader and CommitLogReadHandler classes to help with the deserialization of logs. It seems like the hard work has been done, and what’s left is applying transformations – converting deserialized representations into Avro records and publish them to Kafka. However, as we dug further into the implementation of the CDC feature and of Cassandra itself, we realized that there are many new challenges.

Delayed Processing

Commit logs only arrive in the CDC directory when it is full, in which case it would be flushed/discarded. This implies there is a delay between when the event is logged and when the event is captured. If little to no writes are executed, then the delay in event capturing could be arbitrarily long.

Space Management

In MySQL you can set binlog retention such that the logs will be automatically deleted after the configured retention period. However in Cassandra there is no such option. Once the commit logs are transferred to CDC directory, consumption must be in place to clean up commit logs after processing. If the available disk space for CDC directory exceeds a given threshold, further writes to the database will be rejected.

Duplicated Events

Commit logs on an individual Cassandra node do not reflect all writes to the cluster; they only reflect writes to the node. This makes it necessary to process commit logs on all nodes. But with a replication factor of N, N copies of each event are sent downstream.

Out-of-Order Events

Writes to an individual Cassandra node are logged serially as they arrive. However, these events may arrive out-of-order from when they are issued. Downstream consumers of these events must understand the event time and implement last write wins logic similar to Cassandra’s read path to get the correct result.

Out-of-Band Schema Change

Schema changes of tables are communicated via a gossip protocol and are not recorded in commit logs. Therefore changes in schema could only be detected on a best-effort basis.

Incomplete Row Data

Cassandra does not perform read before write, as a result change events do not capture the state of every column, they only capture the state of modified columns. This makes the change event less useful than if the full row is available.

Once we acquired a deep understanding of Cassandra commit logs, we re-assessed our requirements against the given constraints in order to design a minimum viable infrastructure.

Minimum Viable Infrastructure

Borrowing from the minimum viable product philosophy, we want to design a data pipeline with a minimum set of features and requirements to satisfy our immediate customers. For Cassandra CDC, this means:

  • Production database’s health and performance should not be negatively impacted by introducing CDC; slowed operations and system downtimes are much costlier than a delay in the analytics pipeline

  • Querying Cassandra tables in our data warehouse should match the results of querying the production database (barring delays); having duplicate and/or incomplete rows amplifies post-processing workload for every end user With these criteria in front of us, we began to brainstorm for solutions, and ultimately came up with three approaches:

Stateless Stream Processing

This solution is inspired by Datastax’s advanced replication blog post. The idea is to deploy an agent on each Cassandra node to process local commit logs. Each agent is considered as “primary” for a subset of writes based on partition keys, such that every event has exactly one primary agent. Then during CDC, in order to avoid duplicate events, each agent only sends an event to Kafka if it is the primary agent for the event. To handle eventual consistency, each agent would sort events into per-table time-sliced windows as they arrive (but doesn’t publish them right away); when a window expires, events in that window are hashed, and the hash is compared against other nodes. If they don’t match, data is fetched from the inconsistent node so the correct value could be resolved by last write wins. Finally the corrected events in that window will be sent to Kafka. Any out-of-order event beyond the time-sliced windows would have to be logged into an out-of-sequence file and handled separately. Since deduplication and ordering are done in-memory, concerns with agent failover causing data loss, OOM issues impacting production database, and the overall complexity of this implementation stopped us from exploring it further.

Stateful Stream Processing

This solution is the most feature rich. The idea is that the agent on each Cassandra node will process commit logs and publish events to Kafka without deduplication and ordering. Then a stream processing engine will consume these raw events and do the heavy lifting (such as filtering out duplicate events with a cache, managing event orders with event-time windowing, and capturing state of unmodified columns by performing read before write on a state store), and then publish these derived events to a separate Kafka topic. Finally, KCBQ will be used to consume events from this topic and upload them to BigQuery. This approach is appealing because it solves the problem generically – anyone can subscribe to the latter Kafka topic without needing to handle deduplication and ordering on their own. However, this approach introduces a nontrivial amount of operational overhead; we would have to maintain a stream processing engine, a database, and a cache.

Processing-On-Read

Similar to the previous approach, the idea is to process commit logs on each Cassandra node and send events to Kafka without deduplication and ordering. Unlike the previous approach, the stream processing portion is completely eliminated. Instead the raw events will be directly uploaded to BigQuery via KCBQ. Views are created on top of the raw tables to handle deduplication, ordering, and merging of columns to form complete rows. Because BigQuery views are virtual tables, the processing is done lazily each time the view is queried. To prevent the view query from getting too expensive, the views would be materialized periodically. This approach removes both operational complexity and code complexity by leveraging BigQuery’s massively parallel query engine. However, the drawback is that non-KCBQ downstream consumers must do all the work on their own.

Given that our main purpose of streaming Cassandra is data warehousing, we ultimately decided to implement processing-on-read. It provides the essential features for our existing use case, and offers the flexibility to expand into the other two more generic solutions mentioned above in the future.

Open Source

During this process of building a real-time data pipeline for Cassandra, we have received a substantial amount of interest on this project. As a result, we have decided to open-source the Cassandra CDC agent under the Debezium umbrella as an incubating connector. If you would like to learn more or contribute, check out the work-in-progress pull request for source code and documentation.

In the second half of this blog post series, we will elaborate on the CDC implementation itself in more details. Stay tuned!


Tutorial for Adding Sentry into Debezium Container Images

Debezium has received a huge improvement to the structure of its container images recently, making it extremely simple to extend its behaviour.

This is a small tutorial showing how you can for instance add Sentry, "an open-source error tracking [software] that helps developers monitor and fix crashes in real time". Here we’ll use it to collect and report any exceptions from Kafka Connect and its connectors. Note that this is only applicable for Debezium 0.9+.

We need a few things to have Sentry working, and we’ll add all of them and later have a Dockerfile which gets it all glued correctly:

  • Configure Log4j

  • SSL certificate for sentry.io, since it’s not by default in the JVM trusted chain

  • The sentry and sentry-log4j libraries

Log4j Configuration

Let’s create a file config/log4j.properties in our local project which is a copy of the one shipped with Debezium images and add Sentry to it. Note we added Sentry to log4j.rootLogger and created the section log4j.appender.Sentry, the rest remains as the original configuration:

kafka.logs.dir=logs

log4j.rootLogger=INFO, stdout, appender, Sentry

# Disable excessive reflection warnings - KAFKA-5229
log4j.logger.org.reflections=ERROR

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.threshold=INFO
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p  %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext}  %m   [%c]%n

log4j.appender.appender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.appender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.appender.File=${kafka.logs.dir}/connect-service.log
log4j.appender.appender.layout=org.apache.log4j.PatternLayout
log4j.appender.appender.layout.ConversionPattern=%d{ISO8601} %-5p  %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext}  %m   [%c]%n

log4j.appender.Sentry=io.sentry.log4j.SentryAppender
log4j.appender.Sentry.threshold=WARN

Sentry.io SSL certificate

Download the getsentry.pem file from sentry.io and put it in your project’s directory under ssl/.

The Dockerfile

Now we can glue everything together in our Debezium image:

  • Let’s first create a JKS file with our Sentry certificate; this uses a Docker multi-stage building process, where we are generating a certificates.jks which we’ll later copy into our Kafka Connect with Debezium stage

  • Copy log4j.properties into $KAFKA_HOME/config/log4j.properties

  • Copy the JKS file from the multi-stage build

  • Set ENV with the Sentry version and m5sums

  • Download Sentry dependencies, the script you see called docker-maven-download is a helper which we ship by default in our images. In this case we’re using it to download a JAR file from Maven Central and put it in the Kafka libs directory. We do that by setting the ENV var MAVEN_DEP_DESTINATION=$KAFKA_HOME/libs:

FROM fabric8/java-centos-openjdk8-jdk:1.6 as ssl-jks

ARG JKS_STOREPASS="any random password, you can also set it outside via the arguments from docker build"

USER root:root

COPY /ssl /ssl

RUN chown -R jboss:jboss /ssl

USER jboss:jboss

WORKDIR /ssl

RUN keytool -import -noprompt -alias getsentry \
    -storepass "${JKS_STOREPASS}" \
    -keystore certificates.jks \
    -trustcacerts -file "/ssl/getsentry.pem"

FROM debezium/connect:0.10 AS kafka-connect

EXPOSE 8083

COPY config/log4j.properties "$KAFKA_HOME/config/log4j.properties"

COPY --from=ssl-jks --chown=kafka:kafka /ssl/certificates.jks /ssl/

ENV SENTRY_VERSION=1.7.23 \
    MAVEN_DEP_DESTINATION=$KAFKA_HOME/libs

RUN docker-maven-download \
        central io/sentry sentry "$SENTRY_VERSION" 4bf1d6538c9c0ebc22526e2094b9bbde && \
    docker-maven-download \
        central io/sentry sentry-log4j "$SENTRY_VERSION" 74af872827bd7e1470fd966449637a77

Build and Run

Now we can simply build the image:

$ docker build -t debezium/connect-sentry:1 --build-arg=JKS_STOREPASS="123456789" .

When running the image we have now to configure our Kafka Connect application to load the JKS file by setting KAFKA_OPTS: -Djavax.net.ssl.trustStore=/ssl/certificates.jks -Djavax.net.ssl.trustStorePassword=<YOUR TRUSTSTORE PASSWORD>.

Sentry can be configured in many ways, I like to do it via environment variables, the minimum we can set is the Sentry DSN (which is necessary to point to your project) and the actual running environment name (i.e.: production, staging).

In this case we can configure the variables: SENTRY_DSN=<GET THE DNS IN SENTRY’S DASHBOARD>, SENTRY_ENVIRONMENT=dev.

In case you’d like to learn more about using the Debezium container images, please check our tutorial.

And that’s it, a basic a recipe for extending our Docker setup using Sentry as an example; other modifications should also be as simple as this one. As an example how a RecordTooLarge exception from the Kafka producer would look like in this setup, see the picture below:

Sentry Exception example

Conclusion

Thanks to the recent refactor of the Debezium container images, it got very easy to amend them with your custom extensions. Downloading external dependencies and adding them to the images became a trivial task and we’d love to hear your feedback about it!

If you are curious about the refactoring itself, you can find the details in pull request debezium/docker-images#131.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium 0.10.0.Beta2 Released

It’s my pleasure to announce the release of Debezium 0.10.0.Beta2!

This further stabilizes the 0.10 release line, with lots of bug fixes to the different connectors. 23 issues were fixed for this release; a couple of those relate to the DDL parser of the MySQL connector, e.g. around RENAME INDEX (DBZ-1329), SET NEW in triggers (DBZ-1331) and function definitions with the COLLATE keyword (DBZ-1332).

For the Postgres connector we fixed a potential inconsistency when flushing processed LSNs to the database (DBZ-1347). Also the "include.unknown.datatypes" option works as expected now during snapshotting (DBZ-1335) and the connector won’t stumple upon materialized views during snapshotting any longer (DBZ-1345).

The SQL Server connector will use much less memory in many situations (DBZ-1065) and it’s configurable now whether it should emit tombstone events for deletions or not (DBZ-835). This also was added for the Oracle connector, bringing consistency for this option across all the connectors.

Note that this release can be used with Apache Kafka 2.x, but not with 1.x. This was an unintentional change and compatibility with 1.x will be restored for the Beta3 release (the issue to track is DBZ-1361).

Please refer to the 0.10.0.Beta2 release notes to learn more about all resolved issues and the upgrading procedure.

Many thanks to everybody from the Debezium community who contributed to this release: Cheng Pan, Guillaume Rosauro, Mariusz Strzelecki and Stathis Souris.

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


Debezium Wears Fedora

The Debezium project strives to provide an easy deployment of connectors, so users can try and run connectors of their choice mostly by getting the right connector archive and unpacking it into the plug-in path of Kafka Connect.

This is true for all connectors but for the Debezium PostgreSQL connector. This connector is specific in the regard that it requires a logical decoding plug-in to be installed inside the PostgreSQL source database(s) themselves. Currently, there are two supported logical plug-ins:

  • postgres-decoderbufs, which uses Protocol Buffers as a very compact transport format and which is maintained by the Debezium community

  • JSON-based, which is based on JSON and which is maintained by its own upstream community

These plug-ins can be consumed and deployed in two ways; the easiest one is to use one of our pre-made Postgres container images, which contain both plug-ins and are already configured as required. If you are using containers in your datacenter, and/or if you start a fresh database from scratch, then this can be a great option.

The other approach is building from source. Even if this is usually an easy task, it still brings a barrier to an easy start and requires a non-trivial knowledge of the Linux operating system.

To bridge the gap between those two extremes we’ve created and published an RPM package, available for Fedora 30 and later. By installing this package you will have the necessary binaries deployed, and the only task remaining is to configure PostgreSQL to enable the plug-in. The RPM is based on the latest stable Debezium release, 0.9.5.Final at this point.

Example

Let’s show how the package works. We will use the Vagrant tool as an easy way for firing up a pre-provisioned virtual machine with Fedora. Of course, that’s not a requirement and the same steps apply for any other way of running Fedora.

Create and start virtual machine with Fedora 30:

$ vagrant init fedora/30-cloud-base

A `Vagrantfile` has been placed in this directory. You are now
ready to `vagrant up` your first virtual environment! Please read
the comments in the Vagrantfile as well as documentation on
`vagrantup.com` for more information on using Vagrant.

$ vagrant up

Bringing machine 'default' up with 'virtualbox' provider...
.
.
.
==> default: Machine booted and ready!

Log into the virtual machine:

$ vagrant ssh

Install the PostgreSQL server and Protocol Buffers logical decoding plug-in:

$ sudo dnf -y install postgresql postgres-decoderbufs
.
.
.
Installed:
  postgres-decoderbufs-0.9.5-1.fc30.x86_64              postgresql-11.3-1.fc30.x86_64
  postgis-2.5.1-1.fc30.x86_64                           armadillo-9.400.4-1.fc30.x86_64
  blas-3.8.0-12.fc30.x86_64                             cairo-1.16.0-5.fc30.x86_64
  cups-libs-1:2.2.11-2.fc30.x86_64                      fontconfig-2.13.1-8.fc30.x86_64
  lapack-3.8.0-12.fc30.x86_64                           libgfortran-9.1.1-1.fc30.x86_64
  libpq-11.3-2.fc30.x86_64                              libquadmath-9.1.1-1.fc30.x86_64
  mariadb-connector-c-3.0.10-1.fc30.x86_64              mariadb-connector-c-config-3.0.10-1.fc30.noarch
  nss-3.44.0-2.fc30.x86_64                              nss-softokn-3.44.0-2.fc30.x86_64
  nss-softokn-freebl-3.44.0-2.fc30.x86_64               nss-sysinit-3.44.0-2.fc30.x86_64
  nss-util-3.44.0-2.fc30.x86_64                         poppler-0.73.0-9.fc30.x86_64
  postgresql-server-11.3-1.fc30.x86_64                  proj-5.2.0-2.fc30.x86_64
  proj-datumgrid-1.8-2.fc30.noarch                      uriparser-0.9.3-1.fc30.x86_64
  SuperLU-5.2.1-6.fc30.x86_64                           arpack-3.5.0-6.fc28.x86_64
  atk-2.32.0-1.fc30.x86_64                              avahi-libs-0.7-18.fc30.x86_64
  cfitsio-3.450-3.fc30.x86_64                           dejavu-fonts-common-2.37-1.fc30.noarch
  dejavu-sans-fonts-2.37-1.fc30.noarch                  fontpackages-filesystem-1.44-24.fc30.noarch
  freexl-1.0.5-3.fc30.x86_64                            fribidi-1.0.5-2.fc30.x86_64
  gdal-libs-2.3.2-7.fc30.x86_64                         gdk-pixbuf2-2.38.1-1.fc30.x86_64
  gdk-pixbuf2-modules-2.38.1-1.fc30.x86_64              geos-3.7.1-1.fc30.x86_64
  giflib-5.1.9-1.fc30.x86_64                            graphite2-1.3.10-7.fc30.x86_64
  gtk-update-icon-cache-3.24.8-1.fc30.x86_64            gtk2-2.24.32-4.fc30.x86_64
  harfbuzz-2.3.1-1.fc30.x86_64                          hdf5-1.8.20-6.fc30.x86_64
  hicolor-icon-theme-0.17-5.fc30.noarch                 jasper-libs-2.0.14-8.fc30.x86_64
  jbigkit-libs-2.1-16.fc30.x86_64                       lcms2-2.9-5.fc30.x86_64
  libXcomposite-0.4.4-16.fc30.x86_64                    libXcursor-1.1.15-5.fc30.x86_64
  libXdamage-1.1.4-16.fc30.x86_64                       libXfixes-5.0.3-9.fc30.x86_64
  libXft-2.3.2-12.fc30.x86_64                           libXi-1.7.9-9.fc30.x86_64
  libXinerama-1.1.4-3.fc30.x86_64                       libaec-1.0.4-1.fc30.x86_64
  libdap-3.20.3-1.fc30.x86_64                           libgeotiff-1.4.3-3.fc30.x86_64
  libgta-1.0.9-2.fc30.x86_64                            libjpeg-turbo-2.0.2-1.fc30.x86_64
  libkml-1.3.0-19.fc30.x86_64                           libspatialite-4.3.0a-11.fc30.x86_64
  libtiff-4.0.10-4.fc30.x86_64                          libwebp-1.0.2-2.fc30.x86_64
  netcdf-4.4.1.1-12.fc30.x86_64                         nspr-4.21.0-1.fc30.x86_64
  ogdi-3.2.1-4.fc30.x86_64                              openblas-0.3.5-5.fc30.x86_64
  openblas-openmp-0.3.5-5.fc30.x86_64                   openblas-serial-0.3.5-5.fc30.x86_64
  openblas-threads-0.3.5-5.fc30.x86_64                  openblas-threads64_-0.3.5-5.fc30.x86_64
  openjpeg2-2.3.1-1.fc30.x86_64                         pango-1.43.0-3.fc30.x86_64
  pixman-0.38.0-1.fc30.x86_64                           poppler-data-0.4.9-3.fc30.noarch
  protobuf-c-1.3.1-2.fc30.x86_64                        unixODBC-2.3.7-4.fc30.x86_64
  xerces-c-3.2.2-2.fc30.x86_64

Complete!

Next, initialize the database:

$ sudo /usr/bin/postgresql-setup --initdb

Now enable the plug-in in the database server configuration file /var/lib/pgsql/data/postgresql.conf by adding the following parameters:

# MODULES
shared_preload_libraries = 'decoderbufs'

# REPLICATION
wal_level = logical             # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 8             # max number of walsender processes (change requires restart)
wal_keep_segments = 4           # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s       # in milliseconds; 0 disables
max_replication_slots = 4       # max number of replication slots (change requires restart)

Configure the security file /var/lib/pgsql/data/pg_hba.conf for the database user that will be used by Debezium (e.g. debezium) by adding these parameters:

local   replication     debezium                          trust
host    replication     debezium  127.0.0.1/32            trust
host    replication     debezium  ::1/128                 trust

Finally, restart PostgreSQL:

$ sudo systemctl restart postgresql

And that’s it: Now we have a PostgreSQL database, that is ready to stream changes to the Debezium PostgreSQL connector. Of course, the plug-in can also be installed to an already existing database (Postgres versions 9 and later), just by installing the RPM package and setting up the config and security files in the described way.

Outlook: pgoutput

While the decoderbufs plug-in is our recommended choice for a logical decoding plug-in, there are cases where you may not be able to use it. Most specifically, you typically don’t have the flexibility to install custom plug-ins in cloud-based environments such as Amazon RDS.

This is why we’re exploring a third alternative to decoderbufs and wal2sjon right now, which is to leverage Postgres logical replication mechanism. There’s a built-in plug-in, pgoutput based on this, which exists in every Postgres database since version 10. We’re still in the process of exploring the implications (and possible limitations) of using pgoutput, but so far things look promising and it may eventually be a valuable tool to have in the box.

Stay tuned for more details coming soon!

About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Gitter, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.


back to top