Per-metric rate limiting: How we protect our backend
To protect our backend and make sure one customer's traffic can't affect anyone else, we carefully limit the amount of data we accept from our users. This data can arrive at different rates and is immediately limited if we suspect it to be harmful. However, there are some cases where traffic will pass by our initial checks but still cause problems further down the data ingestion pipeline. One case we solved recently is when a user is sending a high volume of datapoints per second to a single metric. That’s why we developed per-metric rate limiting, a system we built from the ground up to limit damaging traffic that’s not intended to harm.
If servers receive data at a higher rate than they can process, it can lead to an exhaustion of resources, a backlog of requests and, at worst, data loss. To alleviate the problem, it’s usually possible to follow the traditional approach to rate limiting: applying backpressure to tell the other side to slow down. This lets you control data streams so your server only receives data as fast as it can be processed.
Backpressure wasn’t an option for us, however. Each individual user and metric does not have their own network connection over which we can apply backpressure. As a result, everything is mixed up into batches, and connections are shared by all user data. The only point at which our pipeline has enough data to know we’re in an over-limit condition for a single metric name is too late in the process to take action.
We realised we needed a totally new approach, one that would allow the traffic to flow normally and use the over-limit information late in the process to change the behaviour of earlier parts of the pipeline. In addition, our rate limiting system needed to react to changing levels in traffic (knowing that it’s causing some of the changes itself), be consistent across machines (so that multiple servers could access the same information and react to it) and act relatively fast (for speedy reactions to changing traffic).
Two layers of traffic ingestion
Our backend deals with two layers of traffic ingestion: the first, the load balancing layer, forwards to a second, the aggregation layer. However, no communication was previously flowing from the aggregation back to the load balancing layer. The traffic being forwarded from the load balancing to the aggregation layer takes the following form:
<metric_name> <value> <timestamp>
my.metric.one.count 23 1501519326
The load balancing layer
The load balancing layer is optimised for availability and receives metric traffic over all supported protocols. However, there's a drawback to optimising for availability (and speed): there’s not enough time to ask all the other endpoints in this layer how much traffic they've seen, and specifically for which metric names. That'd be a ridiculous and impractical amount of data to sync around. We therefore can’t decide if we should allow the datapoint we've just received progress onto the second layer for data aggregation. After authentication we forward each datapoint to a specific machine in the aggregation layer, where data aggregation happens. Yet, at this layer, each machine only knows what traffic it has seen for each metric. As a result, it’s not possible to make any decisions about rate limiting for a user’s traffic across the whole set of machines in this layer either - there's still too much usage data to sync around to make it practical.
The aggregation layer
In the aggregation layer, we receive data from the load balancing layer and aggregate metrics together into queryable and useful formats. Only here do we have any oversight over how much traffic/how many datapoints each individual metric is receiving. That means that if the rate of datapoints per second is greater than the upper limit of this layer’s aggregation process, the effectiveness of the aggregation layer is compromised, and a user is breaching their account limits without us knowing, and other users can start seeing some impact. Simply dropping the traffic at this level doesn't solve the problem as we've already done almost all of the work involved in processing it - it just wouldn't help to drop it at this point. However, at this point we do at last have the information about which metrics are seeing too much data, which leads us to...
We took several steps to solve this problem. To begin with, we needed to make separate changes to the aggregation and load balancing layers. In the aggregation layer, we set up a check for offending metrics and from that work out what percentage of their traffic needs to be dropped. In the load balancing layer, we looked at what percentage of traffic would need to be dropped for which metrics. We then needed a way of communicating this information from the aggregation layer back to the load balancing layer, where it could be acted on.
That’s where etcd came in, a distributed, key-value store for data of a distributed system. It’s one way to communicate between the two layers and we chose it primarily for its reliability (it’s distributed using Raft), ease of use and speed (benchmarked 10,000 writes/sec). There were several other reasons etcd made sense for us: firstly, we already had it installed and running in our stack. This allowed us to easily test out a proof of concept without needing extra hardware or installing new tech onto our production machines. Secondly, etcd makes security and authentication more manageable: the trees storing metric information can be locked, which offers us another layer of security around potentially sensitive user data. There's also a useful python-etcd client available which sits nicely into our tech stack (most of the services that run Hosted Graphite are built in python) and we only needed to store some very simple data (name and value).
With etcd in place, we could tell the load balancing layer what percentage of traffic to drop for a specific metric. The aggregation layer could then calculate what percentage of traffic to drop at each ingestion point in the load balancing layer (in order to bring a metric's traffic/datapoint rate below, say, 1,000 per second, or whatever the configured limit is). The load balancing layer could then periodically sync this set of metric names and maintain a lookup of those it needs to start dropping traffic for. If the process in the load balancing layer sees one of these metrics, it begins counting how many data points have been seen for that metric recently. Now that a load balancing process knows what proportion of the traffic it has seen itself, it’s in a position to consider dropping a portion of that traffic in order to reduce the overall traffic level seen at the aggregation layer.
However, a naive implementation of this creates another problem. As the traffic to the aggregation layer, from a metric which is being ratelimited, drops in response to the limiting undertaken at the load balancing layer, it looks like no more limiting is required and so reacts by not dropping any traffic. The problem, though, is that the user hasn't changed the rate they're sending at, and we end up in an oscillating situation where we're limiting, then not limiting, then limiting, etc. This is bad for the health of the aggregation service because it doesn't solve the overload condition, and bad for the user experience because the limiting is unpredictable. The oscillating nature of this traffic actually makes things worse in the aggregation layer because rapidly "banging" traffic volume like this causes instability itself. Smoothing things out should be the goal.
So, using etcd's key TTL/expiry feature and some gradual backoff to recalculate the limit periodically, the limit is dynamically adjusted and will slowly return to a non-limiting state. If the user continues to send at the same rate, the limiting will "wobble" a little, but will stay mostly constant. When the user's traffic pattern changes, the limiting will react within a few seconds and either limit more, less, or not at all, as appropriate, and will do so in a controlled way.
Through per-metric ratelimiting we’ve successfully dropped billions of datapoints that were arriving at dangerous levels, protecting our backend and providing a better experience for our users.