This post originally appeared on the WePay Engineering blog.
In the first half of this blog post series, we explained our decision-making process of designing a streaming data pipeline for Cassandra at WePay. In this post, we will break down the pipeline into three sections and discuss each of them in more detail:
Cassandra to Kafka with CDC agent
Kafka with BigQuery with KCBQ
Transformation with BigQuery view
Cassandra to Kafka with CDC Agent
The Cassandra CDC agent is a JVM process that is intended to be deployed on each node in a Cassandra cluster. The agent is comprised of several interdependent processors, running concurrently and working together to publish change events to Kafka.
This processor is responsible for bootstrapping new tables. It looks up the CDC configuration to determine the snapshot mode, and performs snapshot on CDC-enabled tables if needed. To snapshot a table, the agent performs a full table scan and converts each row in the result set into an individual create event, and then sequentially enqueues them to an in-memory BlockingQueue.
Commit Log Processor
This processor is responsible for watching the CDC directory for new commit logs, parsing the commit log files via Cassandra’s
CommitLogReader, transforming deserialized mutations into standardized change events, and finally enqueuing them to the same queue as the snapshot processor.
At this point, some readers may have concerns in regard to running Snapshot Processor and Commit Log Processors concurrently rather than serially. The reason is that Cassandra uses a client-side timestamp to determine event order, and resolves conflicts with last write wins. This client-side timestamp is deliberately stored in each change event. This is why snapshotting doesn’t have to proceed commit log processing – the ordering is determined later on when the data is queried in the data warehouse.
This processor is responsible for dequeuing change events, transforming them into Avro records, and sending them to Kafka via a Kafka producer. It also tracks the position of the most recently sent event, so that on restart it is able to pick up from where it left off.
Implementing an in-memory queue in the CDC agent seems like overkill at first. Given there is only a single thread doing the enqueue and another thread doing the dequeue, the performance boost is negligible. The motivation here is to decouple the work of parsing commit logs, which should be done serially in the right order, from the work of serializing and publishing Kafka events, which can be parallelized by multiple threads for different tables. Although such parallelization is not implemented at the moment, we want the flexibility of adding this feature in the near future.
Some may also wonder why Kafka Connect is not used here as it seems like a natural fit for streaming. It is a great option if we wanted distributed parallel processing with fault tolerance. However, it is more complicated to deploy, monitor, and debug than a Kafka producer. For the purpose of building a minimum viable infrastructure, we chose Kafka producer at the time.
In order to support automatic schema evolution, this processor periodically polls the database for the latest table schema, and updates the in-memory schema cache if a change is detected. Snapshot Processor and Commit Log Processor both look up table schema from this cache and attach it as part of the change event prior to enqueue. Then upon dequeue, the Queue Processor transforms the attached table schema into an Avro schema for record serialization.
Commit Log Post Processor
Kafka to BigQuery with KCBQ
Transformation with BigQuery View
Once the events are in BigQuery, this is where the heavy-lifting is being done. We create virtual views on top of the raw tables to merge the data in a way that mirrors the source table in Cassandra. Note that each row in the raw tables contains limited data – only columns that have been modified have states. This means selecting the latest row for each primary key will not provide us with data that is consistent with source. Instead, the query must identify the latest cell in each column for each primary key. This can be achieved with self-joins on the primary key for each column in the table. Although joins are slow in MySQL, BigQuery’s parallel execution engine and columnar storage makes this possible. A view on top of a 1TB Cassandra table in BigQuery takes about 100 seconds to query.
The fact that the BigQuery view is virtual implies each time the view is queried essentially triggers a full compaction of the raw data. This means the cost will go up with the number of queries, not to mention the duplicated events amplifies the amount of data that needs to be processed by a factor of N, where N is the replication factor. To save cost and improve performance, periodic compaction by materializing the view is necessary.
Future Development Work
Support for Cassandra 4.0
In Cassandra 4.0, the improved CDC feature allows the connector to be able to parse events in real-time as they are written rather than in micro-batches on each commit log flush. This reduces latency substantially.
As mentioned earlier, there is a single thread responsible for dequeuing, serializing, and publishing Kafka records. However, as the write throughput increases, if the performance of the agent does not keep up, it would result in a backlog of unprocessed commit logs which could potentially impact the health of our production database. The next step is to leverage parallel processing of events to optimize performance.
Streamline with Debezium and Kafka Connect
We initially built the Cassandra CDC agent as a standalone project. Now that it is open-sourced as a Debezium connector, we can replace some of our custom classes with existing ones in Debezium. Another improvement is to support common features that all Debezium connectors have, such as support for multiple serialization formats. Finally, the CDC agent is not fault tolerant; robust alert and monitoring are required as part of deployment. One area to explore in the future is to build the CDC agent on top of Kafka Connect as a source connector, this further streamlines the Cassandra connector with other Debezium connectors, and provides scalability and fault tolerance for free.
Cassandra being a peer-to-peer distributed database poses some really interesting challenges for CDC that do not exist in relational databases like MySQL and Postgres, or even a single-master NoSQL database like MongoDB. Note that it is worth evaluating the limitations before rolling out your own real-time data pipeline for Cassandra.
Besides understanding Cassandra internals, we learned a few lessons on engineering productivity along the way:
Minimum Viable Product Philosophy
By stripping away all features except for the essentials, we were able to build, test, and deploy a working solution in a reasonable time with limited resources. Had we aimed to design a pipeline that encompasses all features upfront, it would have taken a lot longer and required much more resources.
Cassandra is an open-source project. Rather than tackling the problem solo, we were engaged with the Cassandra community from the very start (i.e. sharing experiences with committers and users via meetups, discussing proposals in mailing list, presenting proof-of-concept in conferences, etc.); all of which provided us with valuable feedback throughout the design and implementation stages.
Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium is open source under the Apache License, Version 2.0.
We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter @debezium, chat with us on Zulip, or join our mailing list to talk with the community. All of the code is open source on GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know or log an issue.