Close-to-the-metal architecture handles millions of OPS with predictable single-digit millisecond latencies.
Learn MoreClose-to-the-metal architecture handles millions of OPS with predictable single-digit millisecond latencies.
Learn MoreScyllaDB is purpose-built for data-intensive apps that require high throughput & predictable low latency.
Learn MoreLevel up your skills with our free NoSQL database courses.
Take a CourseOur blog keeps you up to date with recent news about the ScyllaDB NoSQL database and related technologies, success stories and developer how-tos.
Read MoreDistributed systems are usually optimized with particular workloads in mind. At the same time, the system should still behave in a sane way when the assumptions about workload do not hold - notably, one user shouldn't be able to ruin the whole system's performance. Buggy parts of the system can be a source of the overload as well, so it is worth considering overload protection on a per-component basis. For example, ScyllaDB's shared-nothing architecture gives it great scalability, but at the same time makes it prone to a "hot partition" problem: a single partition accessed with disproportionate frequency can ruin performance for other requests handled by the same shards. This talk will describe how we implemented rate limiting on a per-partition basis which reduces the performance impact in such a case, and how we reduced the CPU cost of handling failed requests such as timeouts (spoiler: it's about C++ exceptions).
I would like to tell you about how we at ScyllaDB implemented pair partition rate limiting and overload protection feature designed to mitigate negative effects of some pathologies in the workloads. The talk will explain the problem this feature addresses and how the feature works.
this data partitioning scheme works well when the data and load are evenly distributed across all partitions in the cluster however a problem can occur when one partition suddenly starts receiving a disproportionate amount of traffic compared to other partitions because still had to be assigns only a part of its Computing resources for each partition these charts on replicas responsible for that partition can easily become overloaded the architecture doesn’t really allow other shards or nodes to come to the rescue therefore even a powerful cluster will struggle to serve requests to that partition we call this situation a hot partition problem what’s wrong is that the negative effects aren’t restricted to a single partition each Shard is responsible for handling when the partitions so if one of them becomes hot then the partitions which shared at least a single replica chart will be affected too sometimes hot partitions may appear because the cluster’s schema does not fit your data you need to be aware of how your data is distributed and how it is accessed and used then model it appropriately so that if it’s still at these Trends and avoids its weaknesses unfortunately the responsibility lies with the system designer syllabi won’t repartition the data automatically for you while it makes sense to optimize your data layout for the common case however the uncommon case will inevitably happen for example your users will not always behave as expected it’s very important that their behavior do not bring the whole system to a grinding halt what’s more the source of the problem may not only originate from outside of your system but also from the inside no matter how well we test our code bugs do happen and may cause some parts of the system go crazy leading to highly unbalanced workloads therefore it’s not only important to think about the overload protection at the system level but at the per component level as well the core of the issue of what partitions is that requests are incoming with a greater rate than the replica shards are able to process them the requests form a queue which keeps growing as the queue grows requests need to wait longer and longer ultimately reaching a point where most of them fail due to them timing out before they even start being processed the system has good throughput because it accepts a lot of work but poor good put as most of the work will ultimately be wasted the problem can be mitigated by rejecting some of the requests when we have reasons to believe that we won’t be able to process all of them requests should be rejected as early as possible and as cheaply as possible in order to live the most Computing resources for the remaining requests while it is not always possible to correctly estimate the impact a single query will have on the replicas we can apply some heuristics to decide whether we will be able to handle it let’s say that we don’t expect more than x rights per second to any particular partition in cell ADB 5.1 we implemented per partition rate limiting which follows that idea for a given table you are able to specify a per partition rate limit separately for reads and writes if the cluster detects that reads or writes for a given partition start exceeding user defined limit it will start rejecting some of the requests in an attempt to keep the right of unrejected requests below the limit while the feature tries to accurately apply the defined limits due to the distributed nature of syllabi the actual rate of accepted requests may be higher keep in mind that this feature is not meant for enforcing strict limits for example for API purposes but was rather designed as an overload protection mechanism the inaccuracies are dialed due to the details of the implementation and will be explained in the next sections
so now I will explain how this feature works in order to know how many requests to detect we need to estimate the request rate for each partition because partitions are replicated multiple nodes and not all replicas are usually required to perform an operation there is no obvious single place to track the right instead we perform measurements on each replica separately and use them to estimate the rate let’s zoom in on a single replica and see what is actually being measured each shot keeps a map of indicator counters indexed by token table and operation type when a partition is accessed the counter relevant to the operation is increased by one on the other hand every second we cut every counter in half rounding towards zero due to this it is a simple mathematical exercise to show that if a partition has a steady rate of requests then the counter value will eventually oscillate between X and 2 times x where X is the right of requests per second we managed to implement the counter map as a hash map that uses a fixed statically allocated region of memory that is independent of the number of tables or the size of the data set we employ several tricks to make it work without losing too much accuracy unfortunately due to the time constraints I won’t be able to explain them there but feel free to look at the implementation under the link on the slide now let’s see how these counters are used in the context of the whole operation let’s start with a case where the coordinator is also a replica this happens almost always when an application uses a properly configured Shard aware driver because the coordinator is a replica it has a direct access to one of the counters for the partition relevant to the operation we let the coordinator make a decision based on that counter if we decided to accept the operation it increments its counter and tells other replicas to do as well if it decided to reject then the counter is only incremented locally and replicas are not contacted at all although skipping the replicas causes some under counting the error isn’t very big in practice on the other hand we have saved some processing time and network resources
when the coordinator is not a replica it doesn’t have direct access to any of the counters so we cannot let the coordinator ask a replica for for a counter because it would introduce additional latency which is unacceptable for a mechanism that is supposed to be low cost instead the coordinator proceeds with sending the request to replicas and lets them decide whether to accept or reject the operation unfortunately we cannot guarantee that all replicas will make the same decision if they want then well it’s not the end of the world but some work will go to waste however we try to reduce the chance of that happening first cutting counters in half is scheduled to happen at the start of every second according to the system clock ScyllaDB depends on an accurate system clock so cutting counters on each note should be synchronized relatively well assuming this ends that nodes accept a similar steady rate of replica work the counter values should be close to each other most of the time second the coordinator chooses a random number from range is from 0 to 1 and sends it along with a request each replica computes the probability of rejection which is calculated based on the counter value and rejects if the numbers and Pi coordinator is below it because counters are supposed to be close together replicas will usually agree on the decision there’s one important difference in measurement accuracy between reads and writes that is worth mentioning when a write operation happens the coordinator asks all live replicas to perform the operation the consistency level on the effects the number of flatcasts that the coordinator is supposed to wait for all live replicas incremental counters for every right operation so our current calculations are not affected however in the case of reads the consistency level also affects the number of replicas that are being conducted it can lead to the drought counters being incremented less times than in the case of rights for example with replication Factor 3 and Quorum consistency level only two out of three replicas will be conducted for each lead so the cluster will think that the real read write is proportionally smaller which can lead to a higher rate of request being accepted than the user limit allows we found this acceptable for an overload prevention solution where it’s more important to prevent the cluster performance from collapsing rather than enforcing some strict quota initially after coding the solution we found that performance wasn’t as good as expected in a local Benchmark we actually achieved an opposite effect rejected operations took more CPU than the successful ones it was very strange because tracking per partition kit counts wasn’t expected to be compute intensive it turned out that the problems most related to how ScyllaDB reports failures namely via C plus plus exceptions
exceptions are a bit notorious in the C plus plus Community there are many problems with their design but they are most commonly criticized for bloating binaries with program metadata necessary for exception handling and that the program performance when an exception is thrown can be hard to predict some of the products using C plus plus go as far as the disable exceptions altogether due to these reasons on the other hand it’s hard to avoid exceptions because they are frowned by the standard library and because of their interactions with other language features our asynchronous framework c-star Embraces exceptions and provides facilities for handling them correctly ScyllaDB is built upon Seastar so we got used to reporting failures via exceptions they work fine provided that errors aren’t very frequent however during overload this assumption is no longer true and we notice that throwing exceptions in large volumes can introduce a performance bottleneck this problem affects both existing errors such as timeouts as well as the new rate limit exception that we tried to introduce we had some performance issues before with exceptions in lib STD C plus plus the standard library for GCC throwing an exception involves acquiring a global mutex that mutex protects some information important to the runtime that can be modified when a dynamic library is being loaded or unloaded doesn’t use Dynamic libraries so we were able to disable them detects with some clever workarounds as an unfortunate side effect with the disabled some caching functionality that speeds up further accepted trolls however avoiding scalability pattern that was more important to us so it’s a good trade-off exceptions are usually propagated of the call stack until a tri-cat block is encountered however in programs with non-standard control flow it sometimes makes sense to capture the exception and rethrow it elsewhere for this purpose sap exception PTR can be used our sister framework is a good example of code that uses it c-star allows running concurrent asynchronous tasks which can wait for the results of each other via objects called filters if a task results in an exception it is stored in the future as a STD exception PTR and can be later inspected by a different task what is nice is that c-star reduces the time spent in the exception handling runtime in most cases c-star is not interested in the exception itself but only whether a given future contains an exception or not because of that the common control flow Primitives such as then finally Etc do not have to refrow and inspect the exception they just only check whether an exception is there in addition to this it is possible to construct an exceptional future directly without showing the exception unfortunately this standard Library doesn’t make it possible to inspect an STD exception PTR without refrowing it because each exception has to be inspected and handled appropriately at some point sometimes more than once it is impossible for us to get rid of froze if we want to use exceptions and have portable code we had to provide a cheaper way to return timeouts and rate limit exceptions and we tried two approaches the first approach that we explored was quite heavy-handed it involved plowing through the code and changing all important functions to return a boost result which is a type from the Boost library that holds either a successful result or an error we also introduced a custom container for holding exceptions that allows inspecting the exception inside it without having to throw it this approach worked but had numerous drawbacks first it required a lot of work in order to adapt the existing code return types had to be changed and the Boost results returned by the functions had to be explicitly checked whether they hold an exception or not moreover those tracks introduce a slight overhead on the Happy Puff we only applied this approach to the coordinator logic the other approach that we used was to implement inspecting the value of sdd exception PTR ourselves we encountered a proposal to extend the STD exception ptr’s interface which included adding possibility to inspect its value without refrogling it The Proposal includes an experimental implementation which uses standard Library specific and ABI specific constructs to implement it for some of the major compilers inspired by it we implemented a small utility function which allows to match on the exceptions type and inspect its value and replaced a bunch of tri-cut blocks with it we got rid of froze in the most important parts of the replica logic in this way it was much less work to do than the previous approach as we only had to get rid of try catch blocks and make sure that exceptions are propagated using the existing non-frowing Primitives while the solution is not portable we use and support only a single tool chain at time to build solar DB so it works for us
and now let’s see some results we conducted two benchmarks to show the benefits of our improvements one for per partition rate limiting and the other for the speed UPS in exception handling here is a benchmark that demonstrates the usefulness of per partition rate limiting we have set up a cluster on AWS consisting of free i3.4x large nulls with celebrityb510 rc1 we pre-populated with small data set which fits in memory first we run a uniform read workload of ATK reads per second from a C5 dot Forex large instance this represents the first section of the charts important note the charge shows per Shard measurements next we start another loader instance but instead of reading uniformly it performs heavy queries on a single partition only with eight time as much currency and 10 times the data fetched for each read as expected three shards become overloaded because the benchmarking application uses fixed concurrency it becomes bottlenecked on the overloaded shards and it’s a read throughput drops from ATK to just 26k records per second
finally we apply a per partition limit of 10 reads per second and we can see that the throughput recovers even though the shards that were previously overloaded now coordinate many more requests per second nearly all of the problematic requests are rejected which is much cheaper to do than trying to actually handle them and is still within the capabilities of the shards here is another Benchmark that demonstrates better stability due to the optimizations on the exception handling path a free node cluster similar to the previous one or subjected to a right workload to a single partition the data included in each request is very small so the workload is not i o bound but rather CPU bound and each request has a small server side timeout of 10 milliseconds the benchmarking application gradually increases Its Right rate from 50k rights to 100 K writes per second and we can observe what happens when ScyllaDB is no longer able to handle the throughput the slide actually shows two runs of the same Benchmark except for imposed of the same charts one for ScyllaDB 503 which handles timeouts in the also way and the other for sldb510 rc1 which has exception handling improvements we can see that the older version of ScyllaDB gets overwhelmed somewhere around the 60k rights per second mark you are pushing The Shard to its limit because we have set a short timeout some requests will inevitably fail due to the fact that processing timed out request is more costly than a regular request the chart very quickly enters a state where all requests that it processes result in timeouts the new version which has better exception handling performance is able to sustain a larger throughput some timeouts do occur but they do not overwhelm the chart completely only at the 100K requests per second Mark timeout starts to be more prominent and that’s it thank you [Applause]
Apache® and Apache Cassandra® are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. Amazon DynamoDB® and Dynamo Accelerator® are trademarks of Amazon.com, Inc. No endorsements by The Apache Software Foundation or Amazon.com, Inc. are implied by the use of these marks.