Showing posts with label logging. Show all posts
Showing posts with label logging. Show all posts

Monday, December 9, 2013

Announcing Suro: Backbone of Netflix's Data Pipeline




To make the best business and technical decisions, it is critical for Netflix to reliably collect application specific data in a timely fashion. At Netflix we deploy a fairly large number of AWS EC2 instances that host our web services and applications. They collectively emit more than 1.5 million events per second during peak hours, or around 80 billion events per day. The events could be log messages, user activity records, system operational data, or any arbitrary data that our systems need to collect for business, product, and operational analysis.

Given that data is critical to our operations and yet we allow applications to generate arbitrary events, our data pipeline infrastructure needs to be highly scalable, always available, and deliver events with minimal latency, which is measured as elapsed time between the moment when an event is emitted and when the event is available for consumption by its consumers. And yes, the data pipeline needs to be resilient to our own Simian Army, particularly the Chaos Monkeys.

While various web services and applications produce events to Suro, many kinds of consumers may process such data differently. For example, our Hadoop clusters run MapReduce jobs on the collected events to generate offline business reports. Our event stream clusters generate operational reports to reflect real-time trends. Since we may dispatch events to different consumers based on changing needs, our data pipeline also needs to be dynamically configurable.  

Suro, which we are proud to announce as our latest offering as part of the NetflixOSS family, serves as the backbone of our data pipeline. It consists of a producer client, a collector server, and plugin framework that allows events to be dynamically filtered and dispatched to multiple consumers.

History of Suro

Suro has its roots in Apache Chukwa, which was initially adopted by Netflix. The current incarnation grew out of what we learned from meeting the operational requirements of running in production over the past few years. The following are notable modifications compared to Apache Chukwa:
  • Suro supports arbitrary data formats. Users can plug in their own serialization and deserialization code
  • Suro instruments many tagged monitoring metrics to make itself operations friendly
  • Suro integrates with NetflixOSS to be cloud friendly
  • Suro supports dispatching events to multiple destinations with dynamic configuration
  • Suro supports configurable store-and-forward on both client and collector

Overall Architecture

The figure below illustrates the overall architecture of Suro. It is the single data pipeline that collects events generated by Netflix applications running in either AWS cloud or Netflix data centers. Suro also dispatches events to multiple destinations for further processing.



Such arrangement supports two typical use cases: batched processing, and real-time computation.


Batch Processing


Many analytical reports are generated by Hadoop jobs. In fact, Suro was initially deployed just to collect data for our Big Data Platform team’s Hadoop clusters. For this, Suro aggregates data into Hadoop sequence files, and uploads them into designated S3 buckets. A distributed demuxing cluster demuxes the events in the S3 buckets to prepare them for further processing by Hadoop jobs. We hope to open source the demuxer in the next few months. Our Big Data Platform team has already open sourced pieces of our data infrastructure, such as Lipstick and Genie. Others will be coming soon. A previous post titled Hadoop Platform as a Service in the Cloud covers this in detail.

Real-Time Computation


While offline/batch processing the events still form the bulk of our consumer use cases, the more recent trend has been in the area of real-time stream processing. Stream consumers are typically employed to generate instant feedback, exploratory analysis, and operational insights. Log Summaries of application-generated log data is an example that falls under this bucket. The following graph summarizes how log events flow from applications to two different classes of consumers.



  1. Applications emits events to Suro. The events include log lines.
  2. Suro dispatches all the events to S3 by default. Hadoop jobs will process these events.
  3. Based on a dynamically configurable routing rule, Suro also dispatches these log events to a designated Kafka cluster under a mapped topic. 
  4. Druid cluster indexes the log lines on the fly, making them immediately available for querying. For example, our service automatically detects error surges for each application within a 10 minute window and sends out alerts to application owners:


    Application owners can then go to Druid’s UI to explore such errors:
  5. A customized ElasticSearch cluster also ingests the same sets of log lines, de-duplicates them, and makes them immediately available for querying. Users are able to jump from an aggregated view on the Druid UI to individual records on ElasticSearch’s Kibana UI to see exactly what went wrong.



Of course, this is just one example of making use of real-time analysis. We are also actively looking into various technologies such as Storm and Apache Samza to apply iterative machine learning algorithms on application events.


Suro Collector In Detail

The figure below zooms into the design of the Suro Collector service. The design is similar to SEDA. Events are processed asynchronously in stages. Events are offered to a queue first in each stage, and a pool of threads consume the events asynchronously from the queue, process them, and sends them off to the next stage.

The main processing flow is as follows:
  1. Client buffers events in a buffer called message set, and sends buffered messages to Suro Collector.
  2. Suro Collector takes each incoming message set, deflates it if possible, and immediately returns after handing the message set to Message Set Processor.
  3. The Message Set Processor puts the message set into a queue, and has a pool of  Message Router threads that routes messages asynchronously.
  4. A Message Router determines which sink a message should go to. If there’s a filter configured for a message, the message payload will be deserialized.
  5. Each sink maintains its own queue, and sends messages to designated configurations asynchronously.

Performance Measurement

Here are some results from simple stress tests.

Test setup


Collector Cluster Size
Client Cluster Size
Message Batch Size
Message Size
Sink Config
- 3 m1.xlarge instances
- OS: Linux
- JDK: 1.7.0_25
- 6 m1.xlarge instances
- OS: Linux
- JDK: 1.7.0_25
1000
300 Bytes
- S3 Sink
- Batch: 1000
- Format: sequence file
- Compression: LZO
- Write to disk first: Yes
- Disk Type: EBS
- Notice type: SQS


Test Results


The following table summarizes the test result after warm-up:
Total Message Count
Total Message Size
Peak Throughput Per instance
Average Throughput
705 Million
211 GB
66 K msg/sec
60 K msg/sec


Suro Roadmap

The version open sourced today has the following components.
  • Suro Client
  • Suro Server
  • Kafka Sink and S3 Sink
  • Three Types of Message Filters


In the coming months, we will describe and open source other parts of the pipeline. We would love to collaborate with other solutions in the community in this domain and hope that Suro, Genie, Lipstick etc. provide some of the answers in this highly evolving and popular technology space.

Other Event Pipelines  

Suro evolved over the past few years alongside many other powerful data pipeline solutions such as Apache Flume and Facebook Scribe. Suro has overlapping features with these systems. The strength of Suro is that it is well integrated into AWS and especially the ecosystem of NetflixOSS, to support Amazon Auto Scaling, Netflix Chaos Monkey, and dynamic dispatching of events based on user defined rules. In particular,
  • Suro client has built-in load balancer that is aware of Netflix Eureka, while Suro server also integrates with Netflix Eureka. Therefore, both Suro servers and applications that use Suro client can be auto scaled.
  • Suro server uses EBS and file-backed queues to minimize message loss during unexpected EC2 termination.
  • Suro server is able to push messages to arbitrary consumers at runtime with the help of Netflix Archaius. Users can declaratively configure Suro server at runtime to dispatch events to multiple destinations, such as Apache Kafka, SQS, S3, and any HTTP endpoint. The dispatching can be done in either batches or real time.


Summary

Suro has been the backbone of the data pipeline at Netflix for a few years and has evolved to handle many of the typical use cases that any Big Data infrastructure aims to solve.
In this article, we have described the top level architecture, the use cases, and some of the components that form the overall data pipeline infrastructure at Netflix. We are happy to open source Suro and welcome inputs, comments and involvement from the open source community.


If building critical big data infrastructure is your interest and passion, please take a look at http://jobs.netflix.com.

Tuesday, November 20, 2012

Announcing Blitz4j - a scalable logging framework

By Karthikeyan Ranganathan

We are proud to announce Blitz4j , a critical component of the Netflix logging infrastructure that helps Netflix achieve high volume logging without affecting scalability of the applications.



What is Blitz4j?


Blitz4j is a logging framework built on top of log4j to reduce multithreaded contention and enable highly scalable logging without affecting application performance characteristics.
At Netflix, Blitz4j is used to log billions of events for monitoring, business intelligence reporting, debugging and other purposes. Blitz4j overcomes traditional log4j bottlenecks and comes built with a highly scalable and customizable asynchronous framework. Blitz4j also comes with the ability to convert the existing log4j appenders to use the asynchronous model without changing the existing log4j configurations.
Blitz4j makes runtime reconfigurations of log4j pretty easy. Blitz4j also tries to mitigate data loss and provides a way to summarize the log information during log storms.

Why is scalable logging important?


Logging is a critical part of any application infrastructure. At Netflix, we collect data for monitoring, business intelligence reporting etc. There is also a need to turn on finer grain of logging level for debugging customer issues.
In addition, in a service-oriented architecture, you depend on other central services and if those services break unexpectedly, your applications tend to log orders of magnitude higher than normal. This is where the scalability of the logging infrastructure comes to the fore. Any scalable logging infrastructure should be able to handle these kind of log storms providing useful information about the breakages without affecting the application performance.

History of Blitz4j


At Netflix, log4j has been used as a logging framework for a few years. It had worked fine for us, until the point where there was a real need to log lots of data. When our traffic increased and when the need for per-instance logging went up, log4j's frailties started to get exposed.

Problems with Log4j

Contended Synchronization with Root Logger
Log4j follows a hierarchical model and that makes it easy to turn off/on logging based on a package or a class level (if your logger definition follows that model). In this model, root logger is at the top of the hierarchy. In most cases, all loggers have to get access the root logger to log to the appenders configured there. One of the biggest problems here is that locking is needed on the root logger to write to the appenders. For a high traffic application that logs lot of data this is a big contention point as all application threads have to synchronize on the root logger.

public
  void callAppenders(LoggingEvent event) {
    int writes = 0;
   for(Category c = this; c != null; c=c.parent) {
     // Protected against simultaneous call to addAppender, removeAppender,...
      synchronized(c) {
    if(c.aai != null) {
      writes += c.aai.appendLoopOnAppenders(event);
    }
    if(!c.additive) {
      break;
    }
      }
    }

    if(writes == 0) {
      repository.emitNoAppenderWarning(this);
    }
  }


This severely limits the application scalability. Even if the critical section is executed quickly, this is a huge bottleneck in high volume logging applications.
The reason for the lock seems to be for protection against potential change in the list of appenders which should be a rare event. For us, the thread dump has exposed this contention numerous times.
Asynchronous appender to the rescue?
The longer the time spent in logging to the appender, more threads wait on the logging to complete. Any buffering here helps the scalability of the application tremendously. Some log4j appenders (such as File Appender) comes with the ability to buffer the data and that helps this problem quite a bit. The built in log4j asynchronous appender alleviates this problem quite a bit, but it still does not remove this synchronization. For us, thread dumps revealed another point of contention when logging to the appender with the use of asynchronous appender.
It was quite clear that the built-in asynchronous appender was less scalable because of this synchronization.

public
  synchronized 
  void doAppend(LoggingEvent event) {
    if(closed) {
      LogLog.error("Attempted to append to closed appender named ["+name+"].");
      return;
    }
    
    if(!isAsSevereAsThreshold(event.getLevel())) {
      return;
    }

    Filter f = this.headFilter;
    
    FILTER_LOOP:
    while(f != null) {
      switch(f.decide(event)) {
      case Filter.DENY: return;
      case Filter.ACCEPT: break FILTER_LOOP;
      case Filter.NEUTRAL: f = f.getNext();
      }
    }
    
    this.append(event);    
  }


Deadlock Vulnerability
This double locking (root logger and appender) also makes the application vulnerable to deadlocks if your appender by any chance tries to take a lock on a resource and if that resource tries to log to the appender at the same time.
Locking on Logger Cache
In log4j, loggers are cached in a Hashtable and that needs to be locked for any retrieval of a cached logger. When you want to change the log4j settings dynamically, there are 2 steps in the process.
  1. Reset and empty out all current log4j configurations
  2. Load all configurations including new configurations
During the reset process, locks have to be held on both the cache and the individual loggers. If any of the appenders tried to look up the logger from the cache at the same time,we have the classic case of locks trying to be held in opposite directions and the chance of a deadlock.

public
  void shutdown() {
    Logger root = getRootLogger();

    // begin by closing nested appenders
    root.closeNestedAppenders();

    synchronized(ht) {
      Enumeration cats = this.getCurrentLoggers();
      while(cats.hasMoreElements()) {
    Logger c = (Logger) cats.nextElement();
    c.closeNestedAppenders();
      }

Why Blitz4j?


Central to all the contention and the deadlock vulnerabilities is the locking model in log4j. If the log4j used any of the concurrent data structures with JDK 1.5 and above, most of the problems would be solved. That is exactly what blitz4j does.
Blitz4j overrides key parts of the log4j architecture to remove the locks and replace them with concurrent data structures.Blitz4j puts the emphasis more on application performance and stability rather than accuracy in logging. This means Blitz4j leans more towards the asynchronous model of logging and tries to make the logging useful by retaining the time-order of logging messages.
While the log4j's built-in asynchronous appender is similar in functionality to the one offered by blitz4j, Blitz4j comes with the following differences
  1. Remove all critical synchronizations with concurrent data structures.
  2. Extreme configurability in terms of in-memory buffer and worker threads
  3. More isolation of application threads from logging threads by replacing the wait-notify model with an executor pool model.
  4. Better handling of log messages during log storms with configurable summary.
Apart from the above, Blitz4j also provides the following.
  1. Ability to dynamically configure log4j levels for debugging production problems without affecting the application performance.
  2. Automatic conversion of any log4j appender to the asynchronous model statically or at runtime.
  3. Realtime metrics regarding performance using Servo and dynamic configurability using Archaius.
If your application is equipped with enough memory, it is possible to achieve both application and logging performance without any logging data loss. The power of this model comes to the fore during log storms when the critical dependencies break unexpectedly causing orders of magnitude increase in logging.

Why not use LogBack?

For a new project, LogBack may be an apt choice. For existing projects, there seems to be considerable amount of work to achieve the promised scalability. Besides, Blitz4j has stood the test of time and scrutiny at Netflix and given the familiarity and ubiquity of log4j, it is our architectural choice here at Netflix.

Blitz4j Performance

The  graph below from a couple of our streaming servers which logs about 300-500 lines a second, gives an indication of performance of blitz4j (with asynchronous appender) as compared to log4j (without asynchronous appender).
In a steady state, the latter is atleast 3 times more expensive than blitz4j. There are numerous spikes that happen with the log4j implementation that are due to the synchronizations talked about earlier.


Other things we observed:
When log4j (without synchronizations removed) is used with asynchronous appender, it scales much better (graph not included here), but it just takes that much higher amount of logging for the spikes to show up.

We have also observed with blit4j even with very high amount of logging turned on, the application response times remained unaffected when compared to log4j (with synchronization) where the response times degraded rapidly when the amount of logging increased.

References

 Blitz4j Source
Blitz4j Wiki


If building critical infrastructure components like this, for a service that millions of people use world wide, excites you, take a look at http://jobs.netflix.com.