Showing posts with label real-time streaming. Show all posts
Showing posts with label real-time streaming. Show all posts

Wednesday, April 27, 2016

Kafka Inside Keystone Pipeline

This is the second blog of our Keystone pipeline series. Please refer to the first part for overview and evolution of the Keystone pipeline. In summary, the Keystone pipeline is a unified event publishing, collection, and routing infrastructure for both batch and stream processing.


We have two sets of Kafka clusters in Keystone pipeline: Fronting Kafka and Consumer Kafka. Fronting Kafka clusters are responsible for getting the messages from the producers which are virtually every application instance in Netflix. Their roles are data collection and buffering for downstream systems. Consumer Kafka clusters contain a subset of topics routed by Samza for real-time consumers.

We currently operate 36 Kafka clusters consisting of 4,000+ broker instances for both Fronting Kafka and Consumer Kafka. More than 700 billion messages are ingested on an average day. We are currently transitioning from Kafka version 0.8.2.1 to 0.9.0.1.

Design Principles

Given the current Kafka architecture and our huge data volume, to achieve lossless delivery for our data pipeline is cost prohibitive in AWS EC2. Accounting for this, we’ve worked with teams that depend upon our infrastructure to arrive at an acceptable amount of data loss, while balancing cost.  We’ve achieved a daily data loss rate of less than 0.01%. Metrics are gathered for dropped messages so we can take action if needed.

The Keystone pipeline produces messages asynchronously without blocking applications. In case a message cannot be delivered after retries, it will be dropped by the producer to ensure the availability of the application and good user experience. This is why we have chosen the following configuration for our producer and broker:

  • acks = 1
  • block.on.buffer.full = false
  • unclean.leader.election.enable = true
Most of the applications in Netflix use our Java client library to produce to Keystone pipeline. On each instance of those applications, there are multiple Kafka producers, with each producing to a Fronting Kafka cluster for sink level isolation. The producers have flexible topic routing and sink configuration which are driven via dynamic configuration that can be changed at runtime without having to restart the application process. This makes it possible for things like redirecting traffic and migrating topics across Kafka clusters. For non-Java applications, they can choose to send events to Keystone REST endpoints which relay messages to fronting Kafka clusters.

For greater flexibility, the producers do not use keyed messages. Approximate message ordering is re-established in the batch processing layer (Hive / Elasticsearch) or routing layer for streaming consumers.

We put the stability of our Fronting Kafka clusters at a high priority because they are the gateway for message injection. Therefore we do not allow client applications to directly consume from them to make sure they have predictable load.

Challenges of running Kafka in the Cloud

Kafka was developed with data center as the deployment target at LinkedIn. We have made notable efforts to make Kafka run better in the cloud.

In the cloud, instances have an unpredictable life-cycle and can be terminated at anytime due to hardware issues. Transient networking issues are expected. These are not problems for stateless services but pose a big challenge for a stateful service requiring ZooKeeper and a single controller for coordination.

Most of our issues begin with outlier brokers. An outlier may be caused by uneven workload, hardware problems or its specific environment, for example, noisy neighbors due to multi-tenancy. An outlier broker may have slow responses to requests or frequent TCP timeouts/retransmissions. Producers who send events to such a broker will have a good chance to exhaust their local buffers while waiting for responses, after which message drop becomes a certainty. The other contributing factor to buffer exhaustion is that Kafka 0.8.2 producer doesn’t support timeout for messages waiting in buffer.

Kafka’s replication improves availability. However, replication leads to inter-dependencies among brokers where an outlier can cause cascading effect. If an outlier slows down replication, replication lag may build up and eventually cause partition leaders to read from the disk to serve the replication requests. This slows down the affected brokers and eventually results in producers dropping messages due to exhausted buffer as explained in previous case.

During our early days of operating Kafka, we experienced an incident where producers were dropping a significant amount of messages to a Kafka cluster with hundreds of instances due to a ZooKeeper issue while there was little we could do. Debugging issues like this in a small time window with hundreds of brokers is simply not realistic.

Following the incident, efforts were made to reduce the statefulness and complexity for our Kafka clusters, detect outliers, and find a way to quickly start over with a clean state when an incident occurs.

Kafka Deployment Strategy

The following are the key strategies we used for deploying Kafka clusters

  • Favor multiple small Kafka clusters as opposed to one giant cluster. This reduces the operational complexity for each cluster. Our largest cluster has less than 200 brokers.
  • Limit the number of partitions in each cluster. Each cluster has less than 10,000 partitions. This improves the availability and reduces the latency for requests/responses that are bound to the number of partitions.
  • Strive for even distribution of replicas for each topic. Even workload is easier for capacity planning and detection of outliers.
  • Use dedicated ZooKeeper cluster for each Kafka cluster to reduce the impact of ZooKeeper issues.

The following table shows our deployment configurations.


Fronting Kafka Clusters
Consumer Kafka Clusters
Number of clusters
24
12
Total number of instances
3,000+
900+
Instance type
d2.xl
i2.2xl
Replication factor
2
2
Retention period
8 to 24 hours
2 to 4 hours

Kafka Failover

We automated a process where we can failover both producer and consumer (router) traffic to a new Kafka cluster when the primary cluster is in trouble. For each fronting Kafka cluster, there is a cold standby cluster with desired launch configuration but minimal initial capacity. To guarantee a clean state to start with, the failover cluster has no topics created and does not share the ZooKeeper cluster with the primary Kafka cluster. The failover cluster is also designed to have replication factor 1 so that it will be free from any replication issues the original cluster may have.

When failover happens, the following steps are taken to divert the producer and consumer traffic:

  • Resize the failover cluster to desired size.
  • Create topics on and launch routing jobs for the failover cluster in parallel.
  • (Optionally) Wait for leaders of partitions to be established by the controller to minimize the initial message drop when producing to it.
  • Dynamically change the producer configuration to switch producer traffic to the failover cluster.

The failover scenario can be depicted by the following chart:
With the complete automation of the process, we can do failover in less than 5 minutes. Once failover has completed successfully, we can debug the issues with the original cluster using logs and metrics. It is also possible to completely destroy the cluster and rebuild with new images before we switch back the traffic. In fact, we often use failover strategy to divert the traffic while doing offline maintenance. This is how we are upgrading our Kafka clusters to new Kafka version without having to do the rolling upgrade or setting the inter-broker communication protocol version.

Development for Kafka

We developed quite a lot of useful tools for Kafka. Here are some of the highlights:

Producer sticky partitioner

This is a special customized partitioner we have developed for our Java producer library. As the name suggests, it sticks to a certain partition for producing for a configurable amount of time before randomly choosing the next partition. We found that using sticky partitioner together with lingering helps to improve message batching and reduce the load for the broker. Here is the table to show the effect of the sticky partitioner:


partitioner
batched records
per request
broker cpu utilization [1]
random without lingering
1.25
75%
sticky without lingering
2.0
50%
sticky with 100ms lingering
15
33%

[1] With an load of 10,000 msgs / second per broker and 1KB per message

Rack aware replica assignment

All of our Kafka clusters spans across three AWS availability zones. An AWS availability zone is conceptually a rack. To ensure availability in case one zone goes down, we developed the rack (zone) aware replica assignment so that replicas for the same topic are assigned to different zones. This not only helps to reduce the risk of a zone outage, but also improves our availability when multiple brokers co-located in the same physical host are terminated due to host problems. In this case, we have better fault tolerance than Kafka’s N - 1 where N is the replication factor.

The work is contributed to Kafka community in KIP-36 and Apache Kafka Github Pull Request #132.

Kafka Metadata Visualizer

Kafka’s metadata is stored in ZooKeeper. However, the tree view provided by Exhibitor is difficult to navigate and it is time consuming to find and correlate information.

We created our own UI to visualize the metadata. It provides both chart and tabular views and uses rich color schemes to indicate ISR state. The key features are the following:

  • Individual tab for views for brokers, topics, and clusters
  • Most information is sortable and searchable
  • Searching for topics across clusters
  • Direct mapping from broker ID to AWS instance ID
  • Correlation of brokers by the leader-follower relationship

The following are the screenshots of the UI:





Monitoring

We created a dedicated monitoring service for Kafka. It is responsible for tracking:

  • Broker status (specifically, if it is offline from ZooKeeper)
  • Broker’s ability to receive messages from producers and deliver messages to consumers. The monitoring service acts as both producer and consumer for continuous heartbeat messages and measures the latency of these messages.
  • For old ZooKeeper based consumers, it monitors the partition count for the consumer group to make sure each partition is consumed.
  • For Keystone Samza routers, it monitors the checkpointed offsets and compares with broker’s log offsets to make sure they are not stuck and have no significant lag.

In addition, we have extensive dashboards to monitor traffic flow down to a topic level and most of the broker’s  metrics.

Future plan

We are currently in process of migrating to Kafka 0.9, which has quite a few features we want to use including new consumer APIs, producer message timeout and quotas. We will also move our Kafka clusters to AWS VPC and believe its improved networking (compared to EC2 classic) will give us an edge to improve availability and resource utilization.

We are going to introduce a tiered SLA for topics. For topics that can accept minor loss, we are considering using one replica. Without replication, we not only save huge on bandwidth, but also minimize the state changes that have to depend on the controller. This is another step to make Kafka less stateful in an environment that favors stateless services. The downside is the potential message loss when a broker goes away. However, by leveraging the producer message timeout in 0.9 release and possibly AWS EBS volume, we can mitigate the loss.

Stay tuned for future Keystone blogs on our routing infrastructure, container management, stream processing and more!
By Real-Time Data Infrastructure Team
Allen Wang, Steven Wu, Monal Daxini, Manas Alekar, Zhenzhong Xu, Jigish Patel, Nagarjun Guraja, Jonathan Bond, Matt Zimmer, Peter Bakas, Kunal Kundaje

Monday, March 14, 2016

Stream-processing with Mantis

Back in January of 2014 we wrote about the need for better visibility into our complex operational environments.  The core of the message in that post was about the need for fine-grained, contextual and scalable insights into the experiences of our customers and behaviors of our services.  While our execution has evolved somewhat differently from our original vision, the underlying principles behind that vision are as relevant today as they were then.  In this post we’ll share what we’ve learned building Mantis, a stream-processing service platform that’s processing event streams of up to 8 million events per second and running hundreds of stream-processing jobs around the clock.  We’ll describe the architecture of the platform and how we’re using it to solve real-world operational problems.

Why Mantis?

There are more than 75 million Netflix members watching 125 million hours of content every day in over 190 countries around the world.  To provide an incredible experience for our members, it’s critical for us to understand our systems at both the coarse-grained service level and fine-grained device level.  We’re good at detecting, mitigating, and resolving issues at the application service level - and we’ve got some excellent tools for service-level monitoring - but when you get down to the level of individual devices, titles, and users, identifying and diagnosing issues gets more challenging.

We created Mantis to make it easy for teams to get access to realtime events and build applications on top of them.  We named it after the Mantis shrimp, a freakish yet awesome creature that is both incredibly powerful and fast.  The Mantis shrimp has sixteen photoreceptors in its eyes compared to humans’ three.  It has one of the most unique visual systems of any creature on the planet.  Like the shrimp, the Mantis stream-processing platform is all about speed, power, and incredible visibility.  

So Mantis is a platform for building low-latency, high throughput stream-processing apps but why do we need it?  It’s been said that the Netflix microservices architecture is a metrics generator that occasionally streams movies.  It’s a joke, of course, but there’s an element of truth to it; our systems do produce billions of events and metrics on a daily basis.  Paradoxically, we often experience the problem of having both too much data and too little at the same time.  Situations invariably arise in which you have thousands of metrics at your disposal but none are quite what you need to understand what’s really happening.  There are some cases where you do have access to relevant metrics, but the granularity isn’t quite good enough for you to understand and diagnose the problem you’re trying to solve.  And there are still other scenarios where you have all the metrics you need, but the signal-to-noise ratio is so high that the problem is virtually impossible to diagnose.  Mantis enables us to build highly granular, realtime insights applications that give us deep visibility into the interactions between Netflix devices and our AWS services.  It helps us better understand the long tail of problems where some users, on some devices, in some countries are having problems using Netflix.

By making it easier to get visibility into interactions at the device level, Mantis helps us “see” details that other metrics systems can’t.  It’s the difference between 3 photoreceptors and 16.

A Deeper Dive

With Mantis, we wanted to abstract developers away from the operational overhead associated with managing their own cluster of machines.  Mantis was built from ground up to be cloud native.  It manages a cluster of EC2 servers that is used to run stream-processing jobs.  Apache Mesos is used to abstract the cluster into a shared pool of computing resources.  We built, and open-sourced, a custom scheduling library called Fenzo to intelligently allocate these resources among jobs.

Architecture Overview

The Mantis platform comprises a master and an agent cluster.  Users submit stream-processing applications as jobs that run as one or more workers on the agent cluster.  The master consists of a Resource Manager that uses Fenzo to optimally assign resources to a jobs’ workers.  A Job Manager embodies the operational behavior of a job including metadata, SLAs, artifact locations, job topology and life cycle.

The following image illustrates the high-level architecture of the system.

Mantis Jobs

Mantis provides a flexible model for defining a stream-processing job. A mantis job can be defined as single-stage for basic transformation/aggregation use cases or multi-stage for sharding and processing high-volume, high-cardinality event streams.

There are three main parts to a Mantis job. 
  • The source is responsible for fetching data from an external source
  • One or more processing stages which are responsible for processing incoming event streams using high order RxJava functions
  • The sink to collect and output the processed data
RxNetty provides non-blocking access to the event stream for a job and is used to move data between its stages.

To give you a better idea of how a job is structured, let's take a look at a typical ‘aggregate by group’ example.


Imagine that we are trying to process logs sent by devices to calculate error rates per device type.  The job is composed of three stages. The first stage is responsible for fetching events from a device log source job and grouping them based on device ID. The grouped events are then routed to workers in stage 2 such that all events for the same group (i.e., device ID) will get routed to the same worker.  Stage 2 is where stateful computations like windowing and reducing - e.g., calculating error rate over a 30 second rolling window - are performed.  Finally the aggregated results for each device ID are collected by Stage 3 and made available for dashboards or other applications to consume.

Job Chaining

One of the unique features of Mantis is the ability to chain jobs together.  Job chaining allows for efficient data and code reuse.  The image below shows an example of an anomaly detector application composed of several jobs chained together.  The anomaly detector streams data from a job that serves Zuul request/response events (filtered using a simple SQL-like query) along with output from a “Top N” job that aggregates data from several other source jobs.

Scaling in Action

At Netflix the amount of data that needs to be processed varies widely based on the time of the day.  Running with peak capacity all the time is expensive and unnecessary. Mantis autoscales both the cluster size and the individual jobs as needed.

The following chart shows how Fenzo autoscales the Mesos worker cluster by adding and removing EC2 instances in response to demand over the course of a week.

And the chart below shows an individual job’s autoscaling in action, with additional workers being added or removed based on demand over a week.


UI for Self-service, API for Integration

Mantis sports a dedicated UI and API for configuring and managing jobs across AWS regions.  Having both a UI and API improves the flexibility of the platform.  The UI gives users the ability to quickly and manually interact with jobs and platform functionality while the API enables easy programmatic integration with automated workflows.

The jobs view in the UI, shown below, lets users quickly see which jobs are running across AWS regions along with how many resources the jobs are consuming.

Each job instance is launched as part of a job cluster, which you can think of as a class definition or template for a Mantis job.  The job cluster view shown in the image below provides access to configuration data along with a view of running jobs launched from the cluster config. From this view, users are able to update cluster configurations and submit new job instances to run.

How Mantis Helps Us

Now that we’ve taken a quick look at the overall architecture for Mantis, let’s turn our attention to how we’re using it to improve our production operations.  Mantis jobs currently process events from about 20 different data sources including services like Zuul, API, Personalization, Playback, and Device Logging to name a few.

Of the growing set of applications built on these data sources, one of the most exciting use cases we’ve explored involves alerting on individual video titles across countries and devices.

One of the challenges of running a large-scale, global Internet service is finding anomalies in high-volume, high-cardinality data in realtime.  For example, we may need access to fine-grained insights to figure out if there are playback issues with House of Cards, Season 4, Episode 1 on iPads in Brazil.  To do this we have to track millions of unique combinations of data (what we call assets) all the time, a use case right in Mantis’ wheelhouse.

Let’s consider this use case in more detail.  The rate of events for a title asset (title * devices * country) shows a lot of variation.  So a popular title on a popular device can have orders of magnitude more events than lower usage title and device combinations.  Additionally for each asset, there is high variability in event rate based on the time of the day.  To detect anomalies, we track rolling windows of unique events per asset.  The size of the window and alert thresholds vary dynamically based on the rate of events.  When the percentage of anomalous events exceeds the threshold, we generate an alert for our playback and content platform engineering teams.  This approach has allowed us to quickly identify and correct problems that would previously go unnoticed or, best case, would be caught by manual testing or be reported via customer service.

Below is a screen from an application for viewing playback stats and alerts on video titles. It surfaces data that helps engineers find the root cause for errors.

In addition to alerting at the individual title level, we also can do realtime alerting on our key performance indicator: SPS.  The advantage of Mantis alerting for SPS is that it gives us the ability to ratchet down our time to detect (TTD) from around 8 minutes to less than 1 minute.  Faster TTD gives us a chance to resolve issues faster (time to recover, or TTR), which helps us win more moments of truth as members use Netflix around the world.

Where are we going?

We’re just scratching the surface of what’s possible with realtime applications, and we’re exploring ways to help more teams harness the power of stream-processing.  For example, we’re working on improving our outlier detection system by integrating Mantis data sources, and we’re working on usability improvements to get teams up and running more quickly using self-service tools provided in the UI.

Mantis has opened up insights capabilities that we couldn’t easily achieve with other technologies and we’re excited to see stream-processing evolve as an important and complementary tool in our operational and insights toolset at Netflix.  

If the work described here sounds exciting to you, head over to our jobs page; we’re looking for great engineers to join us on our quest to reinvent TV! 

by Ben Schmaus, Chris Carey, Neeraj Joshi, Nick Mahilani, and Sharma Podila





Monday, June 15, 2015

NTS: Real-time Streaming for Test Automation

by Peter Hausel and Jwalant Shah

Netflix Test Studio



Netflix members can enjoy instant access to TV shows & Movies on over 1400 different device/OS permutations. Assessing long-duration playback quality and delivering a great member experience on such a diverse set of playback devices presented a huge challenge to the team.


Netflix Test Studio (NTS) was created with the goal of creating a consistent way for internal and external developers to deploy and execute tests. This is achieved by abstracting device differences. NTS also provides a standard set of tools for assessing the responsiveness and quality of the overall experience. NTS now runs over 40,000 long-running tests each day on over 600 devices around the world.


Overview


NTS is a cloud-based automation framework that lets you remote control most Netflix Ready Devices. In this post we’ll focus on two key aspects of the framework:
  • Collect test results in near-realtime.
    • A highly event driven architecture allows us to accomplish this: JSON snippets sent from the single page UI to the device and JavaScript listeners on the device firing back events. We also have a requirement to be able to play back events as they happened, just like a state machine.
  • Allow testers to interact with both the device and various Netflix services during execution.
    • Integrated tests require the control of the test execution stream in order to simulate real-world conditions. We want to simulate failures, pause, debug and resume during test execution.


A typical user interface for Test Execution using NTS














A Typical NTS Test:



Architecture overview

Early implementation of NTS had a relatively simplistic design: hijack a Netflix Ready Device for automation via various redirection methods, then a Test Harness (test executor) would coordinate the execution with the help of a central, public facing Controller service. Eventually, we would get data out from the device via long polling, validate steps, and bubble up validation results back to the client. We built separate clusters of this architecture for each Netflix SDK version.

Original Architecture using Long Polling

nts_legacy_with devices.png

Event playback is not supported

This model worked relatively well in the beginning. However, as the number of supported devices, SDK’s and test cases grew, we started seeing the limitations of this approach: messages were sometimes lost, there was no way of knowing what exactly happened, error messages were misleading, tests were hard to monitor and playback real-time, finally, maintaining almost identical clusters with different test content and SDK versions was introducing an additional maintenance burden as well.

In the next iteration of the tool, we removed the Controller service and most of the polling by introducing a WebSockets proxy (built on top of JSR-356) that was sitting between the clients and Test Executors. We also introduced JSON-RPC as the command protocol.

Updated Version - Near-Realtime (Almost There)

nts_mono_devices.png

Pub/Sub without event playback support

  • Test Executor submits events in a time series fashion to a Websocket Bus which terminates at Dispatcher.
  • Client connects to a Dispatcher with session Id information. One-to-many relationship between Dispatcher and TestExecutors.
  • Dispatcher instance keeps an internal lookup of test execution session id’s to Websocket connections to Test Executors and delivers messages received over those connections to the Client.

This approach solved most of our issues: fewer indirections, real-time streaming capabilities, push-based design. There were only two remaining issues: message durability was still not supported and more importantly, the WebSockets proxy was difficult to scale out due to its stateful nature.

At this point, we started looking into Apache Kafka to replace the internal WebSocket layer with a distributed pub/sub and message queue solution.

Current version - Kafkants_kafka_messaging_devices.png

Pub/Sub with event playback support
A few interesting properties of this pub/sub system:
  • Dispatcher is responsible for handling client requests to subscribe to Test Execution events stream.
  • Kafka provides a scalable message queue between Test Executor and Dispatcher. Since each session id is mapped to a particular partition and each message sent to client includes the current Kafka offset, we can now guarantee reliable delivery of messages to clients with support for replay of messages in case of network reconnection.
  • Multiple clients can subscribe to the same stream without additional overhead and admin users can view/monitor remote users test execution in real time.
  • The same stream is consumed for analytics purposes as well.
  • Throughput/Latency: during load testing, we could get ~90-100ms latency per message consistently with 100 concurrent users (our test setup was 6 brokers deployed on 6 d2.xlarge instances). In our production system, latency is often lower due to batching.

Where do we go from here?

With HTTP/2 on the horizon, it’s unclear where WebSockets will fit in the long-run. That said, if you need a TCP-based, persistent channel now, you don’t have a better option. While we are actively migrating away from JSR-356 (and Tomcat Websocket) to RxNetty due to numerous issues we ran into, we continue to invest more in WebSockets.

As for Kafka, the transition was not problem free either. But Kafka solved some very hard problems for us (distributed event bus, message durability, consuming a stream both as a distributed queue and pub/sub etc.) and more importantly, it opened up the door for further decoupling. As a result, we are moving forward with our strategic plan to use this technology as the unified backend for our data pipeline needs.

(Engineers who worked on this project: Jwalant Shah, Joshua Hua, Matt Sun)