EVCache is a distributed in-memory caching solution based on memcached. It is a Tier-0 system at Netflix with its footprint across ~18,000 servers holding ~14 petabytes of data, and still rapidly growing.
We had previously given an overview of how we perform cache warming on our EVCache clusters and why it’s needed. The architecture mentioned in the original cache warming article worked great for a vast majority of our use cases. However, as Netflix’s subscriber base grew over the years, the footprint of the data stored in EVCache has increased multiple fold and will continue to increase to meet new and additional demand. As the scale and sensitivity of the clusters increased, the architecture needed to evolve to keep up.
In this post we discuss the various bottlenecks encountered and we will present an improved architecture with a much higher throughput suited for petabyte scale datasets by addressing the bottlenecks we faced. We will share results from our production environment which show our total warm up times reduced by ~90% as compared to our previous architecture.
We are thrilled to open-source the memcached dumper part of this project for use in the broader community, called Cachemover, which allows one to dump memcached data to disk as fast as possible.
Figure 1 shows a high level view of what EVCache cache warming looks like
Figure 2 below shows the architectural overview of our original cache warming system. It has three main blocks — Controller, Dumper and Populator.
To give a quick overview of how the previous cache warming process worked, the Controller creates a SQS queue for metadata, and then instructs all instances in the existing source replica to dump and upload data to S3. The dumping and uploading processes are performed by a Java based sidecar process on each source instance. It also continually publishes metadata updates to SQS as it uploads data to S3. A distributed Populator service is spun up to download the metadata and data from SQS and S3 respectively and writes it to the new target EVCache replica/cluster.
Note that from S3’s point of view the dumper is the writer and the populator is the reader.
The previous blog post on cache warming delves into further details of this architecture if readers are interested. Next, we will highlight the challenges that arose from this architecture and how we modified the architecture to address those challenges.
The primary challenges we identified with the above architecture are:
- The dumping process is handled by the Java based sidecar (the “Cache Dumper” component in Fig. 2) and if the instances are running low on memory, we would see OutOfMemory errors.
On facing an OOM error, the OS either kills the sidecar or the memcached process. If a memcached process is killed, it is not a fatal error, as EVCache can serve data from another replica. However, it does require us to re-warm the data on that source instance. If on the other hand, the side car is killed on the source instance, we need to restart the data dump from the beginning for that node. Both these scenarios are not ideal and could require manual intervention, which hinders developer productivity, and increases the cost of the cache warming process if we face a cascade of OOM failures on clusters with large datasets.
- As mentioned in the future enhancements sections of cache warming, another bottleneck with the current approach is uploading and downloading data-chunks to and from S3. We observe that the S3 network bandwidth gets throttled after a certain period which slows down the end-to-end warming process and hence implicitly increases the likelihood of unrecoverable errors like OOM and potentially affects live traffic (see next point) due to the longer runtime of the cache warming process.
- A further problem with S3 is that the upload path uses the same network as the live traffic; hence the latency and throughput of live traffic is affected during the cache warming process. To make matters worse, the longer the cache warming process takes, the longer is the live traffic affected.
Note that the memcached process itself is so efficient latency-wise that our EVCache cluster performance and throughput are mostly network-bound.
The first challenge is addressed by moving to a C++ based dumper (cachemover) which can be controlled to use as few memory resources as possible or use higher memory depending on the memory available on a given instance. The second and third challenges are addressed by leveraging the multi-attach EBS volumes. Making these two improvements reduced the total warm up time by ~90%. We will present the data from our production environments in the results section.
The design goals for the cache warming system were same as the original post with some additional ones
- Limit network impact on current EVCache clients
- Minimize the memory and disk usage on EVCache nodes
- Shorten the warm up time
- Have no restrictions on when (Peak/Non-peak periods) to warm up
- Make the system configurable enough so that it can maximize the resources available to it
When we published our previous cache warmer post, AWS did not offer multi-attach EBS volumes. Without multi-attach, the dumping stage and the population stage would need to be serialized, since an EBS volume can only be attached to either a source instance or a target instance but not both at the same time. This would effectively at least double the end-to-end warming times.
The biggest motivations for using multi-attach EBS volumes were to address Challenges #2 and #3:
- With EBS, we get additional network bandwidth which is dedicated for just serving EBS traffic.We didn’t want the cache warmer to eat into the network bandwidth available on the instance.
- We avoid S3 throttling due to the dedicated bandwidth to EBS.
- Also the writers and readers can work in-parallel consuming the entire EBS bandwidth due to the multi-attached functionality.
New Cache Warmer Architecture
The diagram below shows the architectural overview of our new cache warming system. The main differences are the introduction of C++ cache dumper (called cachemover), in place of the old cache dumper and moving to amazon EBS instead of S3.
The higher level functionality of all the components is very much the same as the old architecture, but there are changes with respect to the implementation itself. Each critical component in the system is explained below.
Cache Warmer Controller
The Cache Warmer controller is the control plane for cache warming. It orchestrates the entire process of bringing up the new clusters, making sure that the data has been copied from old clusters to the new clusters and tearing down all the old artifacts.
As seen on Fig.3, here are the sequence of operations that take place -
- Cache warmer controller creates EBS volumes depending on the size of the source cluster.
- Cache warmer controller also creates the Cache Populator cluster which is responsible for reading the data from the old cluster and writing it to the destination cluster.
- Once the EBS volumes all come online/are available, the controller attaches each unique EBS volume to both a source instance and to a Cache Populator instance.
- From the source instance, the EBS volume is mounted with RW (Read-Write) permissions. The dumper (details of which are mentioned in the next section) writes the contents of the cache to EBS volume directly.
- On the destination instance, the EBS volume is mounted with RO (Read-Only) permissions. But the caveat to this is — we do not use a clustered file system on top of EBS. So the destination side can’t see the changes made to EBS volume instantaneously. Instead, the Cache Populator instance will unmount and mount the EBS volume to see the latest changes that happened to the file system. This allows both writers and readers to work concurrently, thereby speeding up the entire warming process.
- Steps 4 and 5 are replicated for each source instance and the process of warming is complete when there is no more data to be written to the destination cluster.
- Cache warmer controller will monitor all erroneous conditions and take necessary action to fix the process -
* Dumping process got killed
* Instance got terminated when the warming process is in-progress
* Data corruption
* Misc EBS volume failures
- If at any time during the above steps, we run into a failure path, the controller does a best-effort attempt to revive and continue the process. If not, the controller tears down all the resources it had created to facilitate the cache warming process.
- Once the dumping and population processes complete successfully, we tear down the cache populator instances and release EBS volumes.
The source of the data, i.e. the replica from where the data needs to be copied, can be either provided by the user or the Controller will select the replica with the highest number of items.
Cachemover (Data dumper)
The dumper is an on demand process that is a part of the cachemover project, that runs on EVCache nodes. When requested, it dumps data from a Memcached process in two pipelined steps:
- Key dump (or “meta-dump” in memcached lingo)
- Data dump (or dumping the values)
The first step, the key dump (or meta-dump), is performed using memcached’s LRU crawler utility or hash crawler utility. Note that it cannot be parallelized as Memcached provides them as a single threaded operation. In practice this is not an issue as the key dump elapses only a small fraction of the total dumping time.
Additionally, the key dump is pipelined with the data dump. The key dump outputs the keys to disk in chunked files; we refer to them as “key files”. The data dumper threads can start dumping as soon as the first key file chunk is available. The data dump also dumps “data files” in chunks to the target storage layer (EBS or S3). The Populator service can start populating the target cluster as soon as the first data file is available. Hence, this second layer of pipelining helps reduce the end-to-end warming time drastically.
High Level Architecture
The cachemover dumper is implemented in C++ as a simple multi-threaded task scheduler where every stage listed above is broken down into an independent task.
- There exists a task queue which has a queue of tasks waiting to be executed.
- There are “N” task threads that each execute one task at a time.
- A task scheduler dequeues a task from the task queue and assigns it to a free task thread.
- There are “M” fixed size buffers which are allocated on startup and used throughout the lifetime of the process to handle all the data.
- The dumper tries to stick to a memory limit; which is =>
buffersize x (num_threads x 2)
- Each task thread has 2 dedicated fixed size buffers. (i.e. M = N * 2)
- The dumper exits once the last task is completed.
- On startup, one task gets enqueued by default which is the meta-dump task which does a key dump to disk.
- The key dump contains only key metadata and is chunked into key files. This task does the entire first stage.
- For every key file produced, the meta-dump task enqueues a data-dump task.
- Each data-dump task looks into the key file assigned to it and requests the values from memcached for all the keys it sees.
- It then dumps the data for every key into one or more data files. A checksum is calculated for each data file as it’s written and the final file has it as part of its file name. Checksums are used by Cache Populators to validate the file integrity.
Once we process all the key files and have dumped all the data files, the dumper finally outputs a file named “DONE” to indicate that it has completed successfully.
The dumper can fit into an EBS architecture or can fit into using a SQS/S3 architecture. There is no tight dependency on either of the components.
Cachemover’s dumper optimizations and how they helped
- Fixed memory usage:
One of the biggest issues we faced while using the Java based dumper architecture , was the dumper process increasingly running out of memory as our cluster sizes and traffic volumes grew.
In the C++ version of the dumper, we can configure the dumper to not use more than a configurable amount of memory. Internally, this is managed by using fixed size buffers that are reused throughout the lifetime of the process.
Each worker thread is given 2 of said fixed size buffers. It uses one of them to read from their assigned key files to maintain the list of keys it needs to dump, and uses the other to obtain the values from memcached in bulk. It then eventually writes out the data from the second buffer (that holds the values) to the data files.
We’ve also used internal C++ based data structures as conservatively as possible to not have more than a 10% overhead for untracked memory.
This allows us to configure the dumper to use as much or as little memory depending on the available memory in EVCache clusters.
- Zero-copy on hot paths:
The dumper internally avoids copying bytes from buffers of data we read from Memcached. Since we’re talking about the order of hundreds of millions of keys per memcached instance, any extra work per key in the dumper can slow down the process as a whole. For this reason, we do not attempt to modify or copy keys or values from memcached within the dumper.
However, as we bulk request data from memcached, it returns values with extra keywords like “VALUE” before every value which would make the data files much larger if we just dumped them as is. So we rewrite the values in a more efficient binary format while writing to disk.
So if we don’t copy strings internally at the user-space, how do we dump the values to the data files in this different format? We use iovector pointers and point them only to the necessary parts from the input buffer that we want to write out, effectively providing zero-copy in user space. This works well as iovectors do parallel writes allowing us to write multiple keys/values out to disk in bulk.
- Checkpoint support:
If for any reason the dumper fails to run to completion (eg: kernel kills it due to OOM), we can skip over all the previously dumped keys and restart from where we left off.
Cache populator clusters are deployed on demand by the Cache Warmer Controller for a specific warming activity. Once deployed, the controller attaches an EBS volume each to every populator node. Note that the controller ensures there’s 1-to-1 mapping between a source EVCache server node to a populator node. The multi-attached EBS volume is attached to both these nodes so that populators can read data written by a dumper in a pipelined manner.
The Populator is a worker that is tasked with populating the destination replicas. It gets the information about the mount path and destination/new replica through a dynamic property which is set by the Controller.
A populator workflow iterates files in a volume as follows. Note that in Steps 6 and 8, it reports metrics back to the controller so we can monitor the progress.
- It will mount the EBS volume in “Read Only” / “No Recovery” mode, and list data file chunks written by the dumper.
- It ignores files with partial file system metadata. Cache populators can see incomplete file system metadata due to the way it mounts the filesystem, with “No Recovery” option. It proceeds with complete files, leaving incomplete files to be processed by a subsequent iteration. This is explained further in step 7.
- It also excludes files that are already processed by a previous iteration (Step 6)
- Files with complete metadata are queued and a populator worker pool is created to process files from the queue
- Each populator worker thread dequeues file name, verifies the checksum, reads key-values from the data file and writes them to the destination replica. The checksum verification process is important as we could see partial files though metadata is complete. We ignore files where checksums don’t match as a subsequent iteration can process them.
- Upon completion, a data file is marked as completed, so a future iteration will not reprocess that file.
- Main thread waits until all workers are done — unmount the EBS volume, and sleep a few seconds before going back to Step 1. Note that this unmount and mount is necessary to see newly written files and to resynchronize the metadata of files where it saw partial metadata.
- Once the populator sees the DONE file which demarcates an end of a node dump, it exits the workflow and makes the activity complete.
Cache populators are short lived and will get torn down once the warm up activity is completed.
The largest cache that we have warmed up is about 1.7 PB and has about 95 billion items. This cluster has 270 nodes per replica. Hence each instance in the replica carries ~6.5 TB of data. The cache copy took about 9 hours with 270 populator instances.
In contrast, the previous architecture’s benchmark showed warming 700 TB and 46 billion items across 380 nodes per replica took about 24 hours with 570 populator instances. Note that the cluster used for the previous architecture’s benchmark used smaller instance types, each instance having carried ~1.8 TB of data, hence it had more nodes per replica.
To compare the true improvement between both the architectures, we need to compare the throughput of data movement at an instance level, as opposed to comparing the end-to-end warming times. This is because the number of instances in the clusters in both benchmarks are different, hence the level of parallelism also changes. So using the benchmark numbers above, we’ll see how much data a single instance moved per hour in both architectures.
We see that there is ~9–10x improvement in the per-instance warming throughput in our new architecture which in turn indicates that every 1TB of data moves ~9–10x faster per instance through the new architecture. Moving the data faster provides better data quality and reduces the likelihood of system errors like OOMs, etc. An additional benefit is the reduced cost of using ephemeral hardware like the populator instances, since they are torn down much faster.
The charts below show the warming activity in action.
EBS Volume Type:
We use the EBS IO2 volume type which provides sustained IOPS (I/O Operations per second) performance since our workload is I/O intensive. We also make sure to use the multi-attach enabled drives so that we can connect the drive to the source and target instance at the same time. Lastly, we provision drives with 5000 IOPS as that’s in line with how frequent our cachemover dumper and populator processes write to and read from disk.
EBS Throughput and Bandwidth:
We were able to obtain write bandwidths of up to 350 MBps, and read bandwidths of up to 300 MBps. You can also see in the throughput charts in Figure 11 that read and write throughput collectively hovers slightly below 5000 IOPS, which is exactly how much we have provisioned.
The read and write latencies averaged around 5 ms and 3 ms respectively using EBS, which is much faster than uploading to or downloading from S3. While the upload bandwidth to S3 is arguably higher, we’ve empirically found that the cachemover dumper cannot write faster than ~350 MBps. This is because it retrieves values from memcached in real time and so can only write as fast as memcached responds to it. Also, the dumper runs in a resource-constrained environment where it’s only given a limited amount of memory to work with. Therefore, it cannot take advantage of S3’s increased upload bandwidth as it cannot hold that much data in memory. Due to this, with S3, we have to follow a 2 step process: We first need to stage the data on a local disk and then upload it to S3. There is additional write overhead and S3 latencies can also go up to several hundreds of milliseconds contributing to degraded performance when moving very large datasets. We could upload to S3 directly from memory using multipart uploads, but since the dumper process may have less memory to work with, it would warrant multiple high-latency round trips to S3. Whereas with EBS we only have to write a file to the volume before it’s ready for consumption. Hence, the low latency provided by EBS is a great added benefit.
In this blog, we discussed the architecture of our new cache warming infrastructure and showed how we overcame architectural bottlenecks of our previous architecture for significant performance wins.
There’s still much to do and if you want to help work on problems of a similar scale and scope, join us at the Core Data Platform Team at Netflix.