Cache warming: Leveraging EBS for moving petabytes of data

Introduction

EVCache is a distributed in-memory caching solution based on memcached. It is a Tier-0 system at Netflix with its footprint across ~18,000 servers holding ~14 petabytes of data, and still rapidly growing.

Fig.1 EVCache data path for new deployments (from cache warming)

Status Quo

Figure 2 below shows the architectural overview of our original cache warming system. It has three main blocks — Controller, Dumper and Populator.

Fig.2 Cache Warmer Architecture (previous architecture)

Challenges

The primary challenges we identified with the above architecture are:

  1. The dumping process is handled by the Java based sidecar (the “Cache Dumper” component in Fig. 2) and if the instances are running low on memory, we would see OutOfMemory errors.
    On facing an OOM error, the OS either kills the sidecar or the memcached process. If a memcached process is killed, it is not a fatal error, as EVCache can serve data from another replica. However, it does require us to re-warm the data on that source instance. If on the other hand, the side car is killed on the source instance, we need to restart the data dump from the beginning for that node. Both these scenarios are not ideal and could require manual intervention, which hinders developer productivity, and increases the cost of the cache warming process if we face a cascade of OOM failures on clusters with large datasets.
  2. As mentioned in the future enhancements sections of cache warming, another bottleneck with the current approach is uploading and downloading data-chunks to and from S3. We observe that the S3 network bandwidth gets throttled after a certain period which slows down the end-to-end warming process and hence implicitly increases the likelihood of unrecoverable errors like OOM and potentially affects live traffic (see next point) due to the longer runtime of the cache warming process.
  3. A further problem with S3 is that the upload path uses the same network as the live traffic; hence the latency and throughput of live traffic is affected during the cache warming process. To make matters worse, the longer the cache warming process takes, the longer is the live traffic affected.
    Note that the memcached process itself is so efficient latency-wise that our EVCache cluster performance and throughput are mostly network-bound.

Design goals

The design goals for the cache warming system were same as the original post with some additional ones

  • Limit network impact on current EVCache clients
  • Minimize the memory and disk usage on EVCache nodes
  • Shorten the warm up time
  • Have no restrictions on when (Peak/Non-peak periods) to warm up
  • Make the system configurable enough so that it can maximize the resources available to it

Why EBS

When we published our previous cache warmer post, AWS did not offer multi-attach EBS volumes. Without multi-attach, the dumping stage and the population stage would need to be serialized, since an EBS volume can only be attached to either a source instance or a target instance but not both at the same time. This would effectively at least double the end-to-end warming times.

  • With EBS, we get additional network bandwidth which is dedicated for just serving EBS traffic.We didn’t want the cache warmer to eat into the network bandwidth available on the instance.
  • We avoid S3 throttling due to the dedicated bandwidth to EBS.
  • Also the writers and readers can work in-parallel consuming the entire EBS bandwidth due to the multi-attached functionality.

New Cache Warmer Architecture

The diagram below shows the architectural overview of our new cache warming system. The main differences are the introduction of C++ cache dumper (called cachemover), in place of the old cache dumper and moving to amazon EBS instead of S3.

Fig.3 Cache Warmer Architecture (new)

Cache Warmer Controller

The Cache Warmer controller is the control plane for cache warming. It orchestrates the entire process of bringing up the new clusters, making sure that the data has been copied from old clusters to the new clusters and tearing down all the old artifacts.

  1. Cache warmer controller creates EBS volumes depending on the size of the source cluster.
  2. Cache warmer controller also creates the Cache Populator cluster which is responsible for reading the data from the old cluster and writing it to the destination cluster.
  3. Once the EBS volumes all come online/are available, the controller attaches each unique EBS volume to both a source instance and to a Cache Populator instance.
  4. From the source instance, the EBS volume is mounted with RW (Read-Write) permissions. The dumper (details of which are mentioned in the next section) writes the contents of the cache to EBS volume directly.
  5. On the destination instance, the EBS volume is mounted with RO (Read-Only) permissions. But the caveat to this is — we do not use a clustered file system on top of EBS. So the destination side can’t see the changes made to EBS volume instantaneously. Instead, the Cache Populator instance will unmount and mount the EBS volume to see the latest changes that happened to the file system. This allows both writers and readers to work concurrently, thereby speeding up the entire warming process.
  6. Steps 4 and 5 are replicated for each source instance and the process of warming is complete when there is no more data to be written to the destination cluster.
  7. Cache warmer controller will monitor all erroneous conditions and take necessary action to fix the process -
    * Dumping process got killed
    * Instance got terminated when the warming process is in-progress
    * Data corruption
    * Misc EBS volume failures
  8. If at any time during the above steps, we run into a failure path, the controller does a best-effort attempt to revive and continue the process. If not, the controller tears down all the resources it had created to facilitate the cache warming process.
  9. Once the dumping and population processes complete successfully, we tear down the cache populator instances and release EBS volumes.

Cachemover (Data dumper)

The dumper is an on demand process that is a part of the cachemover project, that runs on EVCache nodes. When requested, it dumps data from a Memcached process in two pipelined steps:

  1. Key dump (or “meta-dump” in memcached lingo)
  2. Data dump (or dumping the values)

High Level Architecture

The cachemover dumper is implemented in C++ as a simple multi-threaded task scheduler where every stage listed above is broken down into an independent task.

  • There exists a task queue which has a queue of tasks waiting to be executed.
  • There are “N” task threads that each execute one task at a time.
  • A task scheduler dequeues a task from the task queue and assigns it to a free task thread.
  • There are “M” fixed size buffers which are allocated on startup and used throughout the lifetime of the process to handle all the data.
  • The dumper tries to stick to a memory limit; which is =>
    buffersize x (num_threads x 2)
  • Each task thread has 2 dedicated fixed size buffers. (i.e. M = N * 2)
  • The dumper exits once the last task is completed.
Fig.4 Cachemover dumper in action
  1. On startup, one task gets enqueued by default which is the meta-dump task which does a key dump to disk.
  2. The key dump contains only key metadata and is chunked into key files. This task does the entire first stage.
  3. For every key file produced, the meta-dump task enqueues a data-dump task.
  4. Each data-dump task looks into the key file assigned to it and requests the values from memcached for all the keys it sees.
  5. It then dumps the data for every key into one or more data files. A checksum is calculated for each data file as it’s written and the final file has it as part of its file name. Checksums are used by Cache Populators to validate the file integrity.

Cachemover’s dumper optimizations and how they helped

  • Fixed memory usage:
    One of the biggest issues we faced while using the Java based dumper architecture [4], was the dumper process increasingly running out of memory as our cluster sizes and traffic volumes grew.
    In the C++ version of the dumper, we can configure the dumper to not use more than a configurable amount of memory. Internally, this is managed by using fixed size buffers that are reused throughout the lifetime of the process.
    Each worker thread is given 2 of said fixed size buffers. It uses one of them to read from their assigned key files to maintain the list of keys it needs to dump, and uses the other to obtain the values from memcached in bulk. It then eventually writes out the data from the second buffer (that holds the values) to the data files.
    We’ve also used internal C++ based data structures as conservatively as possible to not have more than a 10% overhead for untracked memory.
    This allows us to configure the dumper to use as much or as little memory depending on the available memory in EVCache clusters.
  • Zero-copy on hot paths:
    The dumper internally avoids copying bytes from buffers of data we read from Memcached. Since we’re talking about the order of hundreds of millions of keys per memcached instance, any extra work per key in the dumper can slow down the process as a whole. For this reason, we do not attempt to modify or copy keys or values from memcached within the dumper.
    However, as we bulk request data from memcached, it returns values with extra keywords like “VALUE” before every value which would make the data files much larger if we just dumped them as is. So we rewrite the values in a more efficient binary format while writing to disk.
    So if we don’t copy strings internally at the user-space, how do we dump the values to the data files in this different format? We use iovector pointers and point them only to the necessary parts from the input buffer that we want to write out, effectively providing zero-copy in user space. This works well as iovectors do parallel writes allowing us to write multiple keys/values out to disk in bulk.
  • Checkpoint support:
    If for any reason the dumper fails to run to completion (eg: kernel kills it due to OOM), we can skip over all the previously dumped keys and restart from where we left off.

Cache Populator

Fig.5 Cache Populators read completed data files from EBS and write to new clusters, pipelining data dumping with data populating
  1. It will mount the EBS volume in “Read Only” / “No Recovery” mode, and list data file chunks written by the dumper.
  2. It ignores files with partial file system metadata. Cache populators can see incomplete file system metadata due to the way it mounts the filesystem, with “No Recovery” option. It proceeds with complete files, leaving incomplete files to be processed by a subsequent iteration. This is explained further in step 7.
  3. It also excludes files that are already processed by a previous iteration (Step 6)
  4. Files with complete metadata are queued and a populator worker pool is created to process files from the queue
  5. Each populator worker thread dequeues file name, verifies the checksum, reads key-values from the data file and writes them to the destination replica. The checksum verification process is important as we could see partial files though metadata is complete. We ignore files where checksums don’t match as a subsequent iteration can process them.
  6. Upon completion, a data file is marked as completed, so a future iteration will not reprocess that file.
  7. Main thread waits until all workers are done — unmount the EBS volume, and sleep a few seconds before going back to Step 1. Note that this unmount and mount is necessary to see newly written files and to resynchronize the metadata of files where it saw partial metadata.
  8. Once the populator sees the DONE file which demarcates an end of a node dump, it exits the workflow and makes the activity complete.

Results

The largest cache that we have warmed up is about 1.7 PB and has about 95 billion items. This cluster has 270 nodes per replica. Hence each instance in the replica carries ~6.5 TB of data. The cache copy took about 9 hours with 270 populator instances.

Fig.6 Throughput comparison between old and new architectures
Fig. 7 & 8: Memory used per host as items fill up the target cluster
Fig.9 Number of keys written from the populator service to new replica
Fig.10 EBS Drive type — IO2 w/ Multi-attached at 5000 IOPS
Fig.11 EBS Metrics during cache warming

Conclusion

In this blog, we discussed the architecture of our new cache warming infrastructure and showed how we overcame architectural bottlenecks of our previous architecture for significant performance wins.

Acknowledgements

This work could not have been completed without the help of Arun Agrawal on behalf of the Core Data Platform Team at Netflix. It also builds on the foundational work of Deva Jayaraman.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Netflix Technology Blog

Netflix Technology Blog

Learn more about how Netflix designs, builds, and operates our systems and engineering organizations