Powering Flickr’s Magic view by fusing bulk and real-time compute

Try it for yourself!

You can try out Flickr’s Magic View on your own photos here, and you can download a working code sample of the simplified lambda architecture here: https://github.com/yahoo/simplified-lambda

Introduction

In this post we’re going to talk about how we came up with a novel revision of the Lambda Architecture for fusing large-scale bulk compute with streaming compute to power Flickr’s Magic View. We were able to create a responsive, real time database operating at a scale of tens of billions of records, with tens to hundreds of millions of records updated per day. We turned to Yahoo’s Hadoop stack to find a way to build this at the massive scale we needed.

Magic View

Figure 1. Magic View in action

Motivation: the Magic View

Flickr’s Magic View takes the hassle out of organizing your photos by applying our computer-vision technology to automatically recognize objects or styles in your photos and present them to you in the Camera Roll’s scrolling view. This all happens in real time as soon as a photo is uploaded, it is categorized and placed into the Magic View.

Aggregating computer vision tags

When a photo is uploaded, it is processed by a computer vision pipeline to generate a set of computer vision tags, which are text labels of the contents of the image. We already had an existing architecture for stream computation of tags on upload, but to implement the Magic View, we needed to maintain per-user reverse indexes and some aggregations of the tags. And we needed to make sure all the data was consistent if a photo was added, removed or updated these indexes and aggregations would have to be updated to reflect this. Finally, we needed to initialize the system with tags for 12 billion photos and videos and run periodic backfills (every time we improved our computer vision algorithms and to cover cases where the stream compute missed images).

The Problem

We initially computed a snapshot of the Magic View indexes and aggregations using map-reduce (via Apache Oozie and Apache Pig), and we were happy with the quick turnaround time (about 7 hours). We considered updating Magic View as a daily batch job, but soon realized this would not give our users the responsive, “live” experience we wanted. So, we built a streaming data layer using Apache Storm and were soon able to update the categories in Magic View in real-time.

The next time we needed to run a backfill, we explored using this streaming layer to load the data. Unfortunately, the overhead of the read-modify-write process was simply too much for a load of this size — after kicking off the process we estimated it would take 28 days this way — much longer than the seven hours we had achieved with a bulk load.

Twenty-eight days was a non-starter – we realized we needed a way to update our bulk aggregations independently of the real-time data streaming in. Solving this problem is how we arrived at our revision to Lambda Architecture. Before digging into the solution, let’s do a quick review of the Lambda Architecture.  If you’re already familiar with it, you can skip this next section.

The Lambda Architecture

We’ll start with Nathan Marz’s book ‘Big Data’, which proposes the database concept of  ‘Lambda Architecture.’ In his analysis, he states that a database query can be represented as a function – Query – which operates on all the data:

result = Query(all data)

In the Lambda architecture, a traditional database is replaced with both a real time and a bulk database. Then query function becomes a “combiner” function of independent queries to each database:

result = Combiner(Query(real time data) + Query(bulk data))

An example of a typical Lambda Architecture is shown in figure 2. It is powered by an append-only queue for its system of record, which is fed by a real time stream of events. Periodically, all the data in the queue is fed into a bulk computation which pre-processes the data to optimize it for queries, and stores these aggregations in a bulk compute database. The real time event stream drives a stream computer, which processes the incoming events into real time aggregations. A query then goes via a query combiner, which queries both the bulk and real time databases, computes the combination, and stores the result.

Typical Lambda Architecture

Figure 2. Typical Lambda Architecture

While relatively new, Lambda Architecture has enjoyed popularity and a number of concrete implementations have been built. Some significant examples are the distributed analytics platform druid, Twitter’s Summingbird, and FiloDB. These implementations conveniently abstract away the databases behind the query combiner.

A significant advantage with this style of architecture is robustness and fault-tolerance via eventual consistency. If a piece of data is skipped in the real time compute there is a guarantee that it will eventually appear in the bulk compute database.

Criticism of the Lambda Architecture has centred around the complicated nature of the combiner. The combiner incurs a developer and systems cost from the need to maintain two different databases. It can be challenging to make sure both systems give the same result. Merging the two queries can become complicated, and finally, more points of failure may be introduced.

The “Ah-ha” Moment

Back to the problem. The data access layer we used for streaming compute uses the atomic read-modify-write pattern to ensure we write consistent data, one record-at-a-time to Apache HBase (a BigTable-style, non-relational database). Again, since this pattern was so much slower in the backfill case we needed to figure out how to get both consistent updates for streaming and  fast loads of the full dataset. Since our bulk data was static, we realized that if we relaxed the consistency constraint we could just run a fast, streaming, write-only load of the bulk data, bringing the load time back down to hours instead of days.

But how could we get around the consistency requirements? We didn’t want a bulk load to clobber data being written from the real time compute process. The insight was that we could just write bulk and streaming data to different column families in the same HBase row. So we added the concept of real time columns and bulk columns in a single row. Basically, bulk loads write to one set of columns and real time writes go to a different set of columns. Since HBase columns are sparse and data is updated relatively slowly we don’t pay much in storage or IO.

We could  now simplify the equation back to:

result = Combiner(Query(data))

The two sets of columns are managed separately by the real time and bulk subsystems. At query time, we perform a single fetch using the HBase API to get both the bulk and real time data. A separate combiner process assembles the final result.

Implementation

 Magic View backend system overview

Figure 3. Magic View Architecture

Figure 3 shows an overview of the system and our enhanced Lambda architecture. For the purposes of this discussion, a convenient abstraction is to consider that each row in the HBase table represents the current state of a given photo. The combiner stage is abstracted into a single Java process, which collects data from HBase and runs transformations on the data and sends it to a Redis cache which is used by the serving layer for the site.

Consistency on read in HBase — the combiner

We have two sets of columns to go with each row in HBase: bulk and real time. The combiner determines the final value for each attribute at read. In the case where data exists for real time but not for bulk (or vice versa) then there is only one value to choose. In the case where they both exist we always choose the real time value. This keeps the combiner very simple and fast.

There is a trick though – whenever we do a backfill, we may need to repair the row since the backfill data may be newer than any real time data that is already present. It turns out this slows down the backfill from seven hours to about 14 — still far faster than loading with read-modify-write.

Production throughput

At scale, this architecture has been able to keep up very comfortably with production load. We can simultaneously run backfills to HBase and serve user information at the same time without impacting latency or the user experience.

User experience

An important measure for how the system works is how the viewer perceives it. The slowest part of the system is paging data from HBase into the serving cache; median time for above-the-fold latency – i.e. enough data is available to render the page – is around 10ms.

Future directions

Our experience has been very positive so far with Magic View and we’re looking at how we might enable users to browse their photos in other dimensions (location or color for example). Early tests have shown that building an OLAP or data cube in this architecture is certainly possible but it’s less clear that it will scale well.

Contributors: Peter Welch, Bhautik Joshi, Hugo Haas, Srinivasan Singanallur, Ayan Ray, Pierre Garrigues, Ben Firestone, Sai Madhavan, Tim Miller

Thanks to Nathan Marz for reviewing this post.

Flickr September 2014

Like this post? Have a love of online photography? Want to work with us? Flickr is hiring mobile, back-end and front-end engineers, in our San Francisco office. Find out more at flickr.com/jobs.

Optimizing Caching: Twemproxy and Memcached at Flickr

Introduction

Flickr places a high priority on our users’ experience, and a critical part of that experience is the speed of the interface.  Regardless of the client you use to access Flickr, caching the proper data and the speed at which our servers can access cached data is critical to delivering on that quality user experience.  The more effective our caching strategy is, the better the Flickr experience will be for our users.  This is true for all the layers of caching we deploy at Flickr from the photo caches to the process data caches buried deep in the system.  In a previous post, we looked at how regional photo caching improved photo serving time in other countries, in this post we’re going to dive down into the innards of Flickr’s software stack and take a look at how we improved Memcached performance for our backend systems.

Back in the olden days (pre-2014), we accessed our Memcached systems through a mix of direct reads from our web servers and writes through a Flickr-developed proxy.  Our proprietary proxy system, Cerberus, handled a whole host of responsibilities. In addition to Memcached set operations, Cerberus managed database updates, the bulk of our Redis accesses, cache consistencyCerberus Based Memcached Architecture (which is why we directed our writes through Cerberus), and a few other miscellaneous transactions.   As Flickr’s traffic and functionality had grown, Memcached set operation performance wasn’t keeping up, so we needed to consider how to address the gap.

Since the development of Cerberus, the software landscape had drastically changed.  When we developed Cerberus, there was no comparable software available, but now several open source projects exist that provide similar proxy services.  On top of the availability of open source tools, Flickr’s traffic and usage patterns have changed over the course of a decade, changing the requirements we had for a proxy system.  Needless to say we had a lot of questions to ask ourselves before we dived into revising the caching architecture.

After years of operation, we had a good picture into the strengths and weaknesses of our current architecture, so when we started thinking about revising it, a few lessons from the past years really stood out.  One thing that we learned over the years was that Cerberus’ lack of a single purpose made it difficult to troubleshoot operational issues with Memcached.  Due to the lack of isolation, a downstream issue with a user database, could impact Memcached access time.  Whatever came next had to isolate cache requests from other data accesses.

Experience had shown us that Memcached get operation latency was a key performance metric, and we learned through trial and error that placing our current Cerberus proxy between the web servers and the Memcached hosts added more network latency than we were willing to tolerate.  Unfortunately, our options for connection pooling and more efficient use of connections were limited in PHP, so we had little recourse than to suffer with a high connection load and fluctuating connections against our Memcached servers.  The next generation system would have to carefully monitor get operation timings and ensure we didn’t introduce more latency into the process.

So as 2014 rolled around, we started to look into an alternative to Cerberus for accessing the Memcached systems.  Should we build a Cerberus 2.0?  Should we look at an open source alternative?  As we weighed our options, one alternative that stood out because it was quite successful in other parts of Yahoo and throughout the industry was Twitter’s Twemproxy.

Introducing Twemproxy

Twemproxy is a lightweight daemon that proxies requests to a pool of Memcached instances.  It provides the following features that we believed would improve our caching infrastructure:

  • Consistent hashing:  Figuring out what hosts in the ring on which to store and retrieve cache data.  While we had implemented consistent hashing before Twemproxy, it was previously left to the client libraries to sort out.
  • Connection pooling:  Reuse of network connections to the Memcached instances, cutting down significantly on the connection load to the Memcached daemons.
  • Command Pipelining:  Accumulating requests destined for the same host and sending as a combined payload.  This feature further reduces connection load and network overhead to the cache processes.

Resulting Architecture

We implemented a solution where each web server host had two local Twemproxies, forwarding to the Memcached rings.

Twemproxy Based Memcached Architecture

In the resulting architecture, all Memcached operations go through twemproxy.  The change accomplished many goals, including:

  • Providing a dedicated system for Memcached requests that was isolated from other systems
  • Reducing the connection load on our Memcached servers through Twemproxy’s connection pooling.  We experienced a 75% overall reduction of TCP connections to Memcached nodes
  • Improved overall caching latency.  This was a benefit that we didn’t necessarily expect.  With Twemproxy, we found that get operations had a 5% reduction in mean processing time and set operations had a 40% in mean processing time

The Road to Twemproxy

As nice as it would be to say we dropped in Twemproxy, declared victory and went for ice cream, we still had to solve a few interesting challenges along the way: maintaining availability, dealing with disparate consistent hashing schemes, and re-implementing cache coherency.

If One is Good, Two is Even Better

From the start, we recognized that the simple daemon model of Twemproxy would need to be managed carefully.  Each deploy through our continuous deployment system could result in changes to the Memcached hosts configuration.  And unfortunately, configuration changes for Twemproxy require the process to be restarted to take effect.  We measured the time to restart Twemproxy in the 1-2 second range, but even for these necessary restarts, it was too much of an interruption for the clients.

Our solution was to run two instances of the daemon on every host that needed the service and manage a careful synchronization between the restarts.  This restart “dance” was wired into the process that deploys the configuration changes to all the Memcached clients.  A couple of patches have been proposed to allow configuring Twemproxy without restarting it, but none of them have yet made the master branch.

When is Ketama not Ketama?

Ketama is a popular consistent hashing algorithm used by many systems to determine where to place a particular key in a multi-node caching system.  Out of the box, Twemproxy uses an implementation of the Ketama algorithm that is compatible with libketama, the C library which is the most commonly used implementation of the Ketama algorithm.

Our initial implementation of consistent hashing was done using Ketama, but with the Spymemcached Java library.  It turns out that Spymemcached has a slight variation in the implementation that makes it incompatible with Twemproxy.

Our transition from our current system to Twemproxy had to happen live, and a sudden change in the cache algorithm would have a painful (and unacceptable) impact on our database systems.  How could we get across this bridge?  Ultimately, we had to patch Twemproxy’s implementation of Ketama to match Spymemcached to maintain a consistent implementation of the Ketama algorithm.

Redis latency in propagating cache clears

Until we figure out how to change the speed of light, the only way we are going to make Flickr fast across the world, is through multiple data centers conveniently located near our users.  While this is way easier than changing the speed of light, it’s not without its complications.

What do they compute at Night ?

Caches between the data centers have to be kept consistent.  Some caches, like photo caches, deal with immutable data and are easy to keep in synch, others like Memcached systems have read-write data which is harder.  Our approach to handling cache consistency in our Memcached systems was to invalidate stale keys in other colo facilities whenever a process updated a value.  As we mentioned previously, Memcache write operations were directly through our Cerberus proxy specifically so Cerberus could dispatch a cache invalidation event to other colo facilities.  The migration to Twemproxy would not be complete, until we implemented a new solution for cache invalidation.

In our Twemproxy-based architecture, we decided to take the responsibility for cache invalidation our of the hands of the data proxy and push it into the client libraries we used to access Memcached.  Whenever a client updates a Memcache key, it enqueues a corresponding cache invalidation event into a distributed Redis queue.  We then deployed simple, single-purpose Java daemons to process the cache invalidation events from the Redis queue and delete the corresponding keys in their local Memcached systems.  A diagram of the system, appears below: Cache Clear - Blog Post

The wrinkle with this approach was that the enqueuing of clear keys would occasionally take 20 times longer than the normal mean time, pushing cache sets up to 40ms. After much digging, we found that the spikes were happening when the clearing daemons dequeued a batch of keys.  The dequeuing daemons were performing operations across a WAN. Due to the single-threaded nature of Redis, it would periodically block the queue for adding keys for 10s of milliseconds.  Once we figured that out, the fix was a matter of keeping separate in- and out-queues, and moving the keys from in to out with a local daemon, which significantly reduced the blocked time for writing keys.

Conclusion

Caching is crucial to a high-traffic site like Flickr, and we have taken a big stride in making our Memcached utilization more effective.  Using Twemproxy, we were able to clean up an internal system, reduce the connection load on our caching daemons, and even make modest improvements to caching latency for all clients.  Although we faced some technical challenges in implementing twemproxy for Memcached, particularly in propagating cache clear events, it was ultimately well worth the engineering investment.  After several months, our implementation of Twemproxy has proven to make a positive contribution to caching speed and ultimately the experience of a responsive site for our users.

If you dream in low latency and love to rip that extra 10 microseconds of overhead out of an operation, we’d love to have you! Stop by our Jobs page and tell us how awesome you are.