At Numberly, the Omnichannel Delivery team owns all the types of messages we support and operate for our clients, from the well-known and established email, to the still emerging Rich Communication Services (RCS) – and don’t forget the over-the-top (OTT) platforms such as WhatsApp.
The team recently got the chance to build a “platform to rule them all” with the goal of streamlining the way all our components send and track messages, whatever their form. The general logic is as follows: Clients or programmatic platforms send messages or batches of messages using REST API gateways that are responsible for validating and rendering the message payload. Then, those gateways will all converge towards a Central Message Routing Platform that will implement full-featured scheduling, accounting, tracing and of course routing of the messages using the right platform or operator connectors.
Moving from a dedicated platform per channel to one
Looking at the Central Messaging Platform
High Constraints
Putting all your eggs in one basket is always risky, right? Making this kind of move places a lot of constraints on our platform requirements. It has to be very reliable, first, in terms of being highly available and resilient because it will become a single point of failure for all our messages. Second, in terms of being able to scale fast to match the growth of one or multiple channels at once as our routing needs change.
Strong Guarantees
High availability and scale look easy when compared to our observability and idempotence requirements. When you imagine all your messages going through a single place, the ability to trace what happened to every single one of them, or a group of them, becomes a real challenge. Even worse, one of the greatest challenges out there, even more so in a distributed system, is the idempotence guarantee that we lacked so far on the other pipelines. Guaranteeing that a message cannot be sent twice is easier said than done.
Design Thinking and Key Concepts
We split up our objectives into three main concepts that we promised to strictly respect to keep up with the constraints and guarantees of our platform.
Reliability
- Simple: few share-(almost?)-nothing components
- Low coupling: keep remote dependencies to a minimum
- Coding language: efficient with explicit patterns and strict paradigms
Scale
- Application layer: easy to deploy and scale with strong resilience
- Data bus: high-throughput, highly-resilient, horizontally scalable, time- and order- preserving capabilities message bus
- Data querying: low-latency, one-or-many query support
Idempotence
- Processing isolation: workload distribution should be deterministic
Architecture Considerations
The Possible Default Choice
Considering Numberly’s stack, the first go-to architecture could have been something like this:
- Application layers running on Kubernetes
- Kafka as a message-passing bus from Gateway APIs
- Kafka as a log of messages to be processed and sent
- ScyllaDB as a storage layer to query the state of individual messages or group of messages
- Redis as a hot cache for some optimizations
- Kafka as a messaging bus between our Central Message Routing Platform to individual channel routing agents
On paper, it sounds like a solid and proven design, right?
A Not-So-Default Choice After All
This apparently simple go-to architecture has caveats that break too many of the concepts we promised to stick with.
Reliability
High availability with low coupling: We would rely on and need to design our reliability upon three different data technologies, each of which could fail for different reasons that our platform logic should handle.
Scalability
While we are lucky to dispose of a data technology to match each scalability constraint we set, the combination of the three does not match our reliability + idempotence requirements. Their combination adds too much complexity and points of failure to be efficiently implemented together:
Easy to deploy: Kubernetes would do the job all right.
- Data horizontal scaling: While ScyllaDB would scale for sure, Kafka scaling with its partitions logic requires caution, and Redis does not scale that well out of the box.
- Data low-latency querying: ScyllaDB and Redis are the clear winners here, while Kafka is obviously not designed to “query” a piece of data easily.
- Data-ordered bus: That’s where Kafka excels and where Redis exposes a queuing capability that will scale hazardously. ScyllaDB on the other hand might be able to act as an ordered bus if we give it some thought.
Idempotence
As expected, idempotence becomes a nightmare when you imagine achieving it on such a complex ecosystem mixing many technologies.
Deterministic workload distribution: Can you achieve it when adding ScyllaDB+Kafka+Redis?
The Daring Architecture: Replacing Kafka with ScyllaDB
So, we decided to be bold and make a big statement: we’ll only use ONE data technology to hold everything together! ScyllaDB was the best suited to face the challenge:
- It’s highly available
- It scales amazingly
- It offers ridiculously fast queries for both single and range queries
This means that ScyllaDB can also be thought of as a distributed cache, effectively replacing Redis. Now replacing Kafka as an ordered-data bus is not so trivial using ScyllaDB, but it seems doable. The biggest question still on our plate was, “How can we get a deterministic workload distribution, if possible, for free?” That’s where I got what turned out to be a not-so-crazy idea after all: “What if I used ScyllaDB’s shard-per-core architecture inside my own application?”
Let’s take a quick detour and explain ScyllaDB shard-per-core architecture.
ScyllaDB Shard-Per-Core Architecture
ScyllaDB’s low-level design uses a shard-per-core architecture to deterministically distribute and process data. The main idea is that the partition key in your data table design determines not only which node is responsible for a copy of the data, but also which CPU core gets to handle its I/O processing.
You got it right: ScyllaDB distributes the data in a deterministic fashion down to a single CPU core.
So, my naive idea was to distribute our messaging platform processing using the exact same logic of ScyllaDB:
The expected effect would be to actually align ScyllaDB’s per-CPU core processing with our application’s and benefit from all the latency/scaling/reliability that comes with it.
The 100% Shard-Aware Application
That’s how we effectively created a 100% shard-aware application. It brings amazing properties to the table:
- Deterministic workload distribution
- Super-optimized data processing capacity aligned from the application to the storage layer
- Strong latency and isolation guarantees per application instance (pod)
- Infinite scale following ScyllaDB’s own ability to grow seamlessly
- Building a Shard-Aware Application
Selecting the Right Programming Language
Now that we got our architecture inspiration, it was time to answer the perpetual question: “Which language to use?”
- We need a modern language that reflects our desire to build a reliable, secure and efficient platform.
- The shard calculation algorithm requires fast hashing capabilities and a great low-level synergy with the ScyllaDB driver.
- Once we established that, Rust was a no-brainer.
The Deterministic Data Ingestion
Incoming messages are handled by a component that we call the ingester. For each message we receive, after the usual validations, we calculate the shard to which the message belongs as it will be stored in ScyllaDB. For this, we use the ScyllaDB Rust driver internal functions (which we contributed).
More precisely, we compute a partition key that matches ScyllaDB’s storage replica nodes and CPU core from our message partition key, effectively aligning our application’s processing with ScyllaDB’s CPU core.
Once this partition key is calculated to match ScyllaDB’s storage layer, we persist the message with all its data in the message table, and at the same time add its metadata to a table named “buffer” with the calculated partition key.
The Deterministic Data Processing
That’s how the data is stored in ScyllaDB. Now, let’s talk about the second component, which we call “schedulers.” Schedulers will consume the ordered data from the buffer table and effectively proceed with the message-routing logic. Following the shard-to-component architecture, a scheduler will exclusively consume the messages of a specific shard just like a CPU core is assigned to a slice of ScyllaDB data.
A scheduler will fetch a slice of the data that it is responsible for from the buffer table.
At this point, a scheduler will have the IDs of the messages it should process. It then fetches the message details from the message table.
The scheduler then processes and sends the message to the right channel it is responsible for.
Each component of the platform is responsible for a slice of messages per channel by leveraging ScyllaDB’s shard-aware algorithm. We obtain 100% aligned data processing from the application’s perspective down to the database.
Replacing Kafka with ScyllaDB
Replacing Kafka as an ordered-data bus is not so trivial using ScyllaDB, but it was surely doable. Let’s get a deeper view into how it works from the scheduler component perspective.
We store messages’ metadata as a time series in the buffer table, ordered by ScyllaDB’s time of ingestion (this is an important detail). Each scheduler keeps a timestamp offset of the last message it successfully processed. This offset is stored in a dedicated table. When a scheduler starts, it fetches the timestamp offset of the shard of data it is assigned to.
A scheduler is an infinite loop fetching the messages it is assigned to within a certain and configurable time window. In fact, a scheduler doesn’t fetch data strictly starting from the last timestamp offset, but instead from the oldest timestamp. That does indeed mean that a single message will be fetched multiple times, but this is handled by our idempotence business logic and optimized by a memory cache. Overlapping the previous time range allows us to prevent any possible message miss that could be caused by a potential write latency or subtle time skew between nodes, since we rely on ScyllaDB’s timestamps.
Retrospective
Reaching our goal was not easy. We failed many times, but finally made it and proved that our original idea was not only working, but also was convenient to work with while being amazingly efficient.
What We Learned
The first thing we want to emphasize is that load testing is more than useful. Early during the development, we set up load tests, sending dozens of thousands of messages per second. Our goal was to test our data schema design at scale and idempotence guarantee. It allowed us to spot multiple issues, sometimes non-trivial ones (like when the execution delay between the statements of our insertion batch was greater than our fetch time window). Yeah, a nightmare to debug…
By the way, our first workload was a naive insert-and-delete, and load testing made large partitions appear very fast.
Hopefully, we also learned about compaction strategies, and especially the Time-Window Compaction Strategy, which we are using now. This allowed us to get rid of the large partitions issue. Message buffering as time series processing allowed us to avoid large partitions.
We Contributed to the ScyllaDB Rust Driver
To make this project possible, we contributed to the ScyllaDB ecosystem, especially to the Rust driver, with a few issues and pull requests. For example, we added code to compute the replica nodes of a primary key, as we needed it to compute the shard of a message:
- optimize PreparedStatement::compute_partition_key
- add ClusterData::get_endpoints
- connection_pool/node: expose node sharder
- session: add keyspaces_to_fetch configuration
- zero-copy and lazy rows deserialization enhancement
We hope it will help you if you want to use this cool sharding pattern in your own shard-aware application at some point.
We also discovered some ScyllaDB bugs, so of course we worked with ScyllaDB support to have them fixed (thanks for your responsiveness).
What We Wish We Could Do
As in all systems, everything is not perfect, and we have some points we wish we could do better. Obviously, ScyllaDB is not a message queuing platform, and we miss Kafka long-polling. Currently, our architecture does regular fetching of each shard buffer, so that’s a lot of useless bandwidth consumed. But, we are working on optimizing this.
Also, we encountered some memory issues, where we did suspect the ScyllaDB Rust driver. We didn’t take so much time to investigate, but it made us dig into the driver code, where we spotted a lot of memory allocations.
As a side project, we started to think about some optimizations; actually, we did more than think, because we wrote a whole prototype of an (almost) allocation-free ScyllaDB Rust driver.
We will maybe make it the subject of a future article, with the Rust driver outperforming the Go driver again.
Going Further With ScyllaDB Features
So we bet on ScyllaDB, and that’s a good thing because it has a lot of other features that we want to benefit from. For example change data capture: Using the CDC Kafka source connector, we could stream our message events to the rest of the infrastructure without touching our application code. Observability made easy. We are looking forward to the ScyllaDB path toward strongly consistent tables with Raft as an alternative to lightweight transactions (LWT). Currently, we are using LWT in a few places, especially for dynamic shard workload attribution, so we can’t wait to test this feature!
This Breakthrough Design Has Been Awarded
We are very proud to have won the ScyllaDB Innovation Award: Top Technical Achievement for this work and results. The design is now in production at Numberly. Feel free to get in touch with us if you want to know more, or even better, if you’d like to join one of our amazing tech teams.