الخميس، 16 يناير 2014
Improving Netflix’s Operational Visibility with Real-Time Insight Tools

الخميس، 9 يناير 2014
S3mper: Consistency in the Cloud
The consistency guarantees for S3 vary by region and operation (details here), but in general, any list or read operation is susceptible to inconsistent information depending on preceding operations. For basic data archival, consistency is not a concern. However, in a data centric computing environment where information flows through a complex workflow of computations and transformations, an eventually consistent model can cause problems ranging from insidious data loss to catastrophic job failure.
Netflix is pleased to announce that s3mper is now released as open source under the Apache License v2.0. We hope that the availability of this library will inspire constructive discussion focusing on how to better manage consistency at scale with the Hadoop stack across the many cloud offerings currently available.
How Inconsistency Impacts Processing
Pig-2 is activated based on the completion of Pig-1 and immediately lists the output directories of the previous task. If the S3 listing is incomplete when the second job starts, it will proceed with incomplete data. This is particularly problematic, as we stated earlier, because there is no indication that a problem occurred. The integrity of resulting data is entirely at the mercy of how consistent the S3 listing was when the second job started.
A variety of other scenarios may result in consistency issues, but inconsistent listing is our primary concern. If the input data is incomplete, there is no indication anything is wrong with the result. Obviously it is noticeable when the expected results vary significantly from long standing patterns or emit no data at all, but if only a small portion of input is missing the results will appear convincing. Data loss occurring at the beginning of a pipeline will have a cascading effect where the end product is wildly inaccurate. Due to the potential impact, it is essential to understand the risks and methods to mitigate loss of data integrity.
Approaches to Managing Consistency
The Impractical
Staging Data
Consistency through Convention
Conventions can be used to eliminate some cases of inconsistency. Read and list inconsistency resulting from overwriting the same location can result in data corruption in that a listing may include old versions of data with new therefore producing an amalgam of two incomplete datasets. Eliminating update inconsistency is achievable by imposing a convention where the same location is never overwritten. Here at Netflix, we encourage the use of a batching pattern, where results are written into partitioned batches and the Hive metastore only references the valid batches. This approach removes the possibility of inconsistency due to update or delete. For all AWS regions except US Standard that provide “read-after-write” consistency, this approach may be sufficient, but relies on strict adherence.
Secondary Index
S3 is designed with an eventually consistent index, which is understandable in context of the scale and the guarantees it provides. At smaller scale, it is possible to achieve consistency through use of a consistent, secondary index to catalog file metadata while backing the raw data on S3. This approach becomes more difficult to achieve as the scale increases, but as long as the secondary index can handle the request rate and still provide guaranteed consistency, it will suffice. There are costs to this approach. The probability of data loss and the complexity increases while performance degrades due to relying on two separate systems.
S3mper: A Hybrid Approach
S3mper is an experimental approach to tracking file metadata through use of a secondary index that provides consistent reads and writes. The intent is to identify when an S3 list operation returns inconsistent results and provide options to respond. We implemented s3mper using aspects to advise methods on the Hadoop FileSystem interface and track file metadata with DynamoDB as the secondary index. The reason we chose DynamoDB is that it provides capabilities similar to S3 (e.g. high availability, durability through replication), but also adds consistent operations and high performance.
The key features s3mper provides include (see here for more detailed design and options):
- Recovery: When an inconsistent listing is identified, s3mper will optionally delay the listing and retry until consistency is achieved. This will delay a job only long enough for data to become available without unnecessarily impacting job performance.
- Notification: If listing cannot be achieved, a notification is sent immediately and a determination can be made as to whether to kill the job or let it proceed with incomplete data.
- Reporting: A variety of events are sent to track the number of recoveries, files missed, what jobs were affected, etc.
- Configurability: Options are provided to control how long a job should wait, how frequently to recheck a listing, and whether to fail a job if the listing is inconsistent.
- Modularity: The implementations for the metastore and notifications can be overridden based on the environment and services at your disposal.
- Administration: Utilities are provided for inspecting the metastore and resolving conflicts between the secondary index in DynamoDB and the S3 index.
S3mper is not intended to solve every possible case where inconsistency can occur. Deleting data from S3 outside of the hadoop stack will result in divergence of the secondary index and jobs being delayed unnecessarily. Directory support is also limited such that recursive listings are still prone to inconsistency, but since we currently derive all our data locations from a Hive metastore, this does not impact us. While this library is still in its infancy and does not support every case, using it in combination with the conventions discussed earlier will alleviate the concern for our workflow and allow for further investigation and development of new capabilities.
S3mper has been running in production at Netflix for a few months and the result is an interesting dataset with respect to consistency. For context, Netflix operates out of the US Standard region where we run tens of thousands of Pig, Hive, and Hadoop jobs across multiple clusters of varying size and process several hundreds of terabytes of data every day. The number of listings is hard to estimate because any given job will perform several listings depending on the number of partitions processed, but s3mper is tracking every interaction Hadoop has with S3 across all clusters and datasets. At any given time, DynamoDB contains metadata on millions of files within our configured 24 hour sliding window of consistency. We keep track of metrics on how frequently s3mper recovers a listing (i.e. postpones a job until it receives a complete listing) and when the delay is exceeded resulting in a job executing with data acquired through an inconsistent listing.
It is clear from these numbers that inconsistent listings make up a tiny fraction of all S3 operations. In many cases all files are available within a few minutes and s3mper can recover the listing. In cases where listings are not recovered, notification goes out to the job owner and they can determine if a rerun is necessary. We can only speculate at the variation seen over time because S3 is a shared resource and we have little knowledge of the underlying implementation.
After investigating a sample of affected jobs, patterns do emerge that appear to result in increased probability of inconsistent listing. For example, a stage within a single job that produces tens of thousands of files and reads them immediately in the next stage appears to have a higher likelihood of consistency issues. We also make use of versioned buckets, which track history through use of delete markers. Jobs that experience slower consistency often overwrite the same location repeatedly, which may have some correlation to how quickly an updated listing is available. These observations are based purely on the types of queries and access patterns that have resulted in inconsistent listings as reported by s3mper.
With the petabytes of data we store in S3 and several million operations we perform each day, our experience with eventual consistency demonstrates that only a very small percentage of jobs are impacted, but the severity of inaccurate results warrants attention. Being able to identify when a consistency issue occurs is beneficial not only due to confidence in resulting data, but helps to exclude consistency in diagnosing where a problem exists elsewhere in the system. There is still more to be learned and we will continue to investigate avenues to better identify and resolve consistency issues, but s3mper is a solution we use in production and will continue to provide insight into these areas.
الخميس، 2 يناير 2014
Introducing PigPen: Map-Reduce for Clojure
It is our pleasure to release PigPen to the world today. PigPen is map-reduce for Clojure. It compiles to Apache Pig, but you don’t need to know much about Pig to use it.
What is PigPen?
- A map-reduce language that looks and behaves like clojure.core
- The ability to write map-reduce queries as programs, not scripts
- Strong support for unit tests and iterative development
Note: If you are not familiar at all with Clojure, we strongly recommend that you try a tutorial here, here, or here to understand some of the basics.
Really, yet another map-reduce language?
If you know Clojure, you already know PigPen
The primary goal of PigPen is to take language out of the equation. PigPen operators are designed to be as close as possible to the Clojure equivalents. There are no special user defined functions (UDFs). Define Clojure functions, anonymously or named, and use them like you would in any Clojure program.
Here’s the proverbial word count:
(require '[pigpen.core :as pig])
(defn word-count [lines]
(->> lines
(pig/mapcat #(-> % first
(clojure.string/lower-case)
(clojure.string/replace #"[^\w\s]" "")
(clojure.string/split #"\s+")))
(pig/group-by identity)
(pig/map (fn [[word occurrences]] [word (count occurrences)]))))
This defines a function that returns a PigPen query expression. The query takes a sequence of lines and returns the frequency that each word appears. As you can see, this is just the word count logic. We don’t have to conflate external concerns, like where our data is coming from or going to.
Will it compose?
Yep - PigPen queries are written as function compositions - data in, data out. Write it once and avoid the copy & paste routine.
Here we use our word-count function (defined above), along with a load and store command, to make a PigPen query:
(defn word-count-query [input output]
(->>
(pig/load-tsv input)
(word-count)
(pig/store-tsv output)))
This function returns the PigPen representation of the query. By itself, it won’t do anything - we have to execute it locally or generate a script (more on that later).
You like unit tests? Yeah, we do that
With PigPen, you can mock input data and write a unit test for your query. No more crossing your fingers & wondering what will happen when you submit to the cluster. No more separate files for test input & output.
Mocking data is really easy. With pig/return
and pig/constantly
, you can inject arbitrary data as a starting point for your script.
A common pattern is to use pig/take
to sample a few rows of the actual source data. Wrap the result with pig/return
and you’ve got mock data.
(use 'clojure.test)
(deftest test-word-count
(let [data (pig/return [["The fox jumped over the dog."]
["The cow jumped over the moon."]])]
(is (= (pig/dump (word-count data))
[["moon" 1]
["jumped" 2]
["dog" 1]
["over" 2]
["cow" 1]
["fox" 1]
["the" 4]]))))
The pig/dump
operator runs the query locally.
Closures (yes, the kind with an S)
Parameterizing your query is trivial. Any in-scope function parameters or let bindings are available to use in functions.
(defn reusable-fn [lower-bound data]
(let [upper-bound (+ lower-bound 10)]
(pig/filter (fn [x] (< lower-bound x upper-bound)) data)))
Note that lower-bound
and upper-bound
are present when we generate the script, and are made available when the function is executed within the cluster.
So how do I use it?
Just tell PigPen where to write the query as a Pig script:
(pig/write-script "word-count.pig"
(word-count-query "input.tsv" "output.tsv"))
And then you have a Pig script which you can submit to your cluster. The script uses pigpen.jar
, an uberjar with all of the dependencies, so make sure that is submitted with it. Another option is to build an uberjar for your project and submit that instead. Just rename it prior to submission. Check out the tutorial for how to build an uberjar.
As you saw before, we can also use pig/dump
to run the query locally and return Clojure data:
=> (def data (pig/return [["The fox jumped over the dog."]
["The cow jumped over the moon."]]))
#'pigpen-demo/data
=> (pig/dump (word-count data))
[["moon" 1] ["jumped" 2] ["dog" 1] ["over" 2] ["cow" 1] ["fox" 1] ["the" 4]]
If you want to get started now, check out getting started & tutorials.
Why do I need Map-Reduce?
Map-Reduce is useful for processing data that won’t fit on a single machine. With PigPen, you can process massive amounts of data in a manner that mimics working with data locally. Map-Reduce accomplishes this by distributing the data across potentially thousands of nodes in a cluster. Each of those nodes can process a small amount of the data, all in parallel, and accomplish the task much faster than a single node alone. Operations such as join and group, which require coordination across the dataset, are computed by partitioning the data with a common join key. Each value of the join key is then sent to a specific machine. Once that machine has all of the potential values, it can then compute the join or do other interesting work with them.
To see how PigPen does joins, let’s take a look at pig/cogroup
. Cogroup takes an arbitrary number of data sets and groups them all by a common key. Say we have data that looks like this:
foo:
{:id 1, :a "abc"}
{:id 1, :a "def"}
{:id 2, :a "abc"}
bar:
[1 42]
[2 37]
[2 3.14]
baz:
{:my_id "1", :c [1 2 3]]}
If we want to group all of these by id
, it looks like this:
(pig/cogroup (foo by :id)
(bar by first)
(baz by #(-> % :my_id Long/valueOf))
(fn [id foos bars bazs] ...))
The first three arguments are the datasets to join. Each one specifies a function that is applied to the source data to select the key. The last argument is a function that combines the resulting groups. In our example, it would be called twice, with these arguments:
[1 ({:id 1, :a "abc"}, {:id 1, :a "def"})
([1 42])
({:my_id "1", :c [1 2 3]]})]
[2 ({:id 2, :a "abc"})
([2 37] [2 3.14])
()]
As you can see, this consolidates all of the values with an id of 1, all of the values with 2, etc. Each different key value can then be processed independently on different machines. By default, keys are not required to be present in all sources, but there are options that can make them required.
Hadoop provides a very low-level interface for doing map-reduce jobs, but it’s also very limited. It can only run a single map-reduce pair at a time as it has no concept of data flow or a complex query. Pig adds a layer of abstraction on top of Hadoop, but at the end of the day, it’s still a scripting language. You are still required to use UDFs (user defined functions) to do interesting tasks with your data. PigPen furthers that abstraction by making map-reduce available as a first class language.
If you are new to map-reduce, we encourage you to learn more here.
Motivations for creating PigPen
- Code reuse. We want to be able to define a piece of logic once, parameterize it, and reuse it for many different jobs.
- Consolidate our code. We don’t like switching between a script and a UDF written in different languages. We don’t want to think about mapping between differing data types in different languages.
- Organize our code. We want our code in multiple files, organized how we want - not dictated by the job it belongs to.
- Unit testing. We want our sample data inline with our unit tests. We want our unit tests to test the business logic of our jobs without complications of loading or storing data.
- Fast iteration. We want to be able to trivially inject mock data at any point in our jobs. We want to be able to quickly test a query without waiting for a JVM to start.
- Name what you want to. Most map-reduce languages force too many names and schemas on intermediate products. This can make it really difficult to mock data and test isolated portions of jobs. We want to organize and name our business logic as we see fit - not as dictated by the language.
- We’re done writing scripts; we’re ready to start programming!
Note: PigPen is not a Clojure wrapper for writing Pig scripts you can hand edit. While it’s entirely possible, the resulting scripts are not intended for human consumption.
Design & Features
PigPen was designed to match Clojure as closely as possible. Map-reduce is functional programming, so why not use an awesome functional programming language that already exists? Not only is there a lower learning curve, but most of the concepts translate very easily to big data.
In PigPen, queries are manipulated as expression trees. Each operation is represented as a map of information about what behavior is desired. These maps can be nested together to build a tree representation of a complex query. Each command also contains references to its ancestor commands. When executed, that query tree is converted into a directed acyclic query graph. This allows for easy merging of duplicate commands, optimizing sequences of related commands, and instrumenting the query with debug information.
Optimization
De-duping
When we represent our query as a graph of operations, de-duping them is trivial. Clojure provides value-equality, meaning that if two objects have the same content, they are equal. If any two operations have the same representation, then they are in fact identical. No care has to be taken to avoid duplicating commands when writing the query - they’re all optimized before executing it.
For example, say we have the following query:
(let [even-squares (->>
(pig/load-clj "input.clj")
(pig/map (fn [x] (* x x)))
(pig/filter even?)
(pig/store-clj "even-squares.clj"))
odd-squares (->>
(pig/load-clj "input.clj")
(pig/map (fn [x] (* x x)))
(pig/filter odd?)
(pig/store-clj "odd-squares.clj"))]
(pig/script even-squares odd-squares))
In this query, we load data from a file, compute the square of each number, and then split it into even and odd numbers. Here’s what a graph of this operation would look like:

This matches our query, but it’s doing some extra work. It’s loading the same input.clj
file twice and computing the squares of all of the numbers twice. This might not seem like a lot of work, but when you do it on a lot of data, simple operations really add up. To optimize this query, we look for operations that are identical. At first glance it looks like our operation to compute squares might be a good candidate, but they actually have different parents so we can’t merge them yet. We can, however, merge the load functions because they don’t have any parents and they load the same file.
Now our graph looks like this:

Now we’re loading the data once, which will save some time, but we’re still doing the squares computation twice. Since we now have a single load command, our map operations are now identical and can be merged:

Now we have an optimized query where each operation is unique. Because we always merge commands one at a time, we know that we’re not going to change the logic of the query. You can easily generate queries within loops without worrying about duplicated execution - PigPen will only execute the unique parts of the query.
Serialization
After we’re done processing data in Clojure, our data must be serialized into a binary blob so Pig can move it around between machines in a cluster. This is an expensive, but essential step for PigPen. Luckily, many consecutive operations in a script can often be packed together into a single operation. This saves a lot of time by not serializing and deserializing the data when we don’t need to. For example, any consecutive map
, filter
, and mapcat
operations can be re-written as a single mapcat
operation.
Let’s look at some examples to illustrate this:

In this example, we start with a serialized (blue) value, 4, deserialize it (orange), apply our map function, and re-serialize the value.
Now let’s try a slightly more complex (and realistic) example. In this example, we apply a map
, mapcat
, and filter
operation.
If you haven’t used it before, mapcat
is an operation where we apply a function to a value and that function returns a sequence of values. That sequence is then ‘flattened’ and each single value is fed into the next step. In Clojure, it’s the result of combining map
and concat
. In Scala, this is called flatMap
and in c# it’s called selectMany
.
In the diagram below, the flow on the left is our query before the optimization; the right is after the optimization. We start with the same value, 4, and calculate the square of the value; same as the first example. Then we take our value and apply a function that decrements the value, returns the value, and increments the value. Pig then takes this set of values and flattens them, making each one an input to the next step. Note that we had to serialize and deserialize the data when interacting with Pig. The third and final step is to filter the data; in this example we’re retaining only odd values. As you can see, we end up serializing and deserializing the data in between each step.
The right hand side shows the result of the optimization. Put simply, each operation now returns a sequence of elements. Our map operation returns a sequence of one element, 16, our mapcat remains the same, and our filter returns a sequence of zero or one elements. By making these commands more uniform, we can merge them more easily. We end up flattening more sequences of values within the set of commands, but there is no serialization cost between steps. While it looks more complex, this optimization results in much faster execution of each if these steps.

Testing, Local Execution, and Debugging
Iterative development, testing, and debuggability are key tenants of PigPen. When you have jobs that can run for days at a time, the last thing you need is an unexpected bug to show up in the eleventh hour. PigPen has a local execution mode that’s powered by rx. This allows us to write unit tests for our queries. We can then know with very high confidence that something will not crash when run and will actually return the expected results. Even better, this feature allows for iterative development of queries.
Typically, we start with just a few records of the source data and use that to populate a unit test. Because PigPen returns data in the REPL, we don’t have to go elsewhere to build our test data. Then, using the REPL, we add commands to map, filter, join, and reduce the mock data as required; each step of the way verifying that the result is what we expect. This approach produces more reliable results than building a giant monolithic script and crossing your fingers. Another useful pattern is to break up large queries into smaller functional units. Map-reduce queries tend to explode and contract the source data by orders of magnitude. When you try to test the script as a whole, you often have to start with a very large amount of data to end up with just a few rows. By breaking the query into smaller parts, you can test the first part, which may take 100 rows to produce two; and then test the second part by using those two rows as a template to simulate 100 more fake ones.
Debug mode has proven to be really useful for fixing the unexpected. When enabled, it will write to disk the result of every operation in the script, in addition to the normal outputs. This is very useful in an environment such as Hadoop, where you can’t step through code and hours may pass in between operations. Debug mode can also be coupled with a graph-viz visualization of the execution graph. You can then visually associate what it plans to do with the actual output of each operation.
To enable debug mode, see the options for pig/write-script
and pig/generate-script
. It will write the extra debug output to the folder specified.
Example of debug mode enabled:
(pig/write-script {:debug "/debug-output/"} "my-script.pig" my-pigpen-query)
To enable visualization, take a look at pig/show
and pig/dump&show
Example of visualization:
(pig/show my-pigpen-query) ;; Shows a graph of the query
(pig/dump&show my-pigpen-query) ;; Shows a graph and runs it locally
Extending PigPen
One nice feature of PigPen is that it’s easy to build your own operators. For example, we built set and multi-set operators such as difference and intersection. These are just variants of other operations like co-group, but it’s really nice to define them once, test them thoroughly, and not have to think about the logic behind a multiset intersection of n sets ever again.
This is useful for more complex operators as well. We have a reusable statistics operator that computes the sum, avg, min, max, sd, and quantiles for a set of data. We also have a pivot operator that groups dimensional fields in the data and counts each group.
While each of these by themselves are simple operations, when you abstract them out of your query, your query starts to become a lot smaller and simpler. When your query is smaller and simpler, you can spend more time focusing on the actual logic of the problem you’re trying to solve instead of re-writing basic statistics each time. What you’re doing becomes clearer, every step of the way.
Why Pig?
We chose Pig because we didn’t want to re-implement all of the optimization work that has gone into Pig already. If you take away the language, Pig does an excellent job of moving big data around. Our strategy was to use Pig’s DataByteArray binary format to move around serialized Clojure data. In most cases, we found that Pig didn’t need to be aware of the underlying types present in the data. Byte arrays can be compared trivially and quickly, so for joins and groupings, Pig simply needs to compare the serialized blob. We get Clojure’s great value equality for free as equivalent data structures produce the same serialized output. Unfortunately, this doesn’t hold true for sorting data. The sorted order of a binary blob is far less than useful, and doesn’t match the sorted order of the native data. To sort data, we must fall back to the host language, and as such, we can only sort on simple types. This is one of very few places where Pig has imposed a limitation on PigPen.
We did evaluate other languages before deciding to build PigPen. The first requirement was that it was an actual programming language, not just a scripting language with UDFs. We briefly evaluated Scalding, which looks very promising, but our team primarily uses Clojure. It could be said that PigPen is to Clojure what Scalding is to Scala. Cascalog is usually the go-to language for map-reduce in Clojure, but from past experiences, datalog has proven less than useful for everyday tasks. There’s a complicated new syntax and concepts to learn, aligning variable names to do implicit joins is not always ideal, misplaced ordering of operations can often cause big performance problems, datalog will flatten data structures (which can be wasteful), and composition can be a mind bender.
We also evaluated a few options to use as a host language for PigPen. It would be possible to build a similar abstraction on top of Hive, but schematizing every intermediate product doesn’t fit well with the Clojure ideology. Also, Hive is similar to SQL, making translation from a functional language more difficult. There’s an impedance mismatch between relational models like SQL and Hive and functional models like Clojure or Pig. In the end, the most straightforward solution was to write an abstraction over Pig.
Future Work
Currently you can reference in-scope local variables within code that is executed remotely, as shown above. One limitation to this feature is that the value must be serializable. This has the downside of not being able to utilize compiled functions - you can’t get back the source code that created them in the first place. This means that the following won’t work:
(defn foo [x] ...)
(pig/map foo)
In this situation, the compiler will inform you that it doesn’t recognize foo
. We’re playing around with different methods for requiring code remotely, but there are some nuances to this problem. Blindly loading the code that was present when the script was generated is an easy option, but it might not be ideal if that code accidentally runs something that was only intended to run locally. Another option would be for the user to explicitly specify what to load remotely, but this poses challenges as well, such as an elegant syntax to express what should be loaded. Everything we’ve tried so far is a little clunky and jar hell with Hadoop doesn’t make it any easier. That said, any code that’s available can be loaded from within any user function. If you upload your uberjar, you can then use a require
statement to load other arbitrary code.
So far, performance in PigPen doesn’t seem to be an issue. Long term, if performance issues crop up, it will be relatively easy to migrate to running PigPen directly on Hadoop (or similar) without changing the abstraction. One of the key performance features we still have yet to build is incremental aggregation. Pig refers to this as algebraic operators (also referenced by Rich Hickey here as combining functions). These are operations that can compute partial intermediate products over aggregations. For example, say we want to take the average of a LOT of numbers - so many that we need map-reduce. The naive approach would be to move all of the numbers to one machine and compute the average. A better approach would be to partition the numbers, compute the sum and count of each of these smaller sets, and then use those intermediate products to compute the final average. The challenge for PigPen will be to consume many of these operations within a single function. For example, say we have a set of numbers and we want to compute the count, sum, and average. Ideally, we would want to define each of these computations independently as algebraic operations and then use them together over the same set of data, having PigPen do the work of maintaining a set of intermediate products. Effectively, we need to be able to compose and combine these operations while retaining their efficiency.
We use a number of other Pig & Hadoop tools at Netflix that will pair nicely with PigPen. We have some prototypes for integration with Genie, which adds a pig/submit
operator. There’s also a loader for Aegisthus data in the works. And PigPen works with Lipstick as the resulting scripts are Pig scripts.
Conclusion
PigPen has been a lot of fun to build and we hope it’s even more fun to use. For more information on getting started with PigPen and some tutorials, check out the tutorial, or to contribute, take a look at our Github page: https://github.com/Netflix/PigPen
There are three distinct audiences for PigPen, so we wrote three different tutorials:
- Those coming from the Clojure community who want to do map-reduce: PigPen for Clojure users
- Those coming from the Pig community who want to use Clojure: PigPen for Pig users
- And a general tutorial for anybody wanting to learn it all: PigPen Tutorial
If you know both Clojure and Pig, you’ll probably find all of the tutorials interesting.
The full API documentation is located here
And if you love big data, check out our jobs.