April 17 2013 in hadoop
The goal of this article is to provide a 10,000 foot view of Hadoop for those who know next to nothing about it. This article is not designed to get you ready for Hadoop development, but to provide a sound knowledge base for you to take the next steps in learning the technology.
Lets get down to it:
Hadoop is an Apache Software Foundation project that importantly provides two things:
- A distributed filesystem called HDFS (Hadoop Distributed File System)
- A framework and API for building and running MapReduce jobs
I will talk about these two things in turn. But first some links for your information:
HDFS
HDFS is structured similarly to a regular Unix filesystem except that data storage is distributed across several machines. It is not intended as a replacement to a regular filesystem, but rather as a filesystem-like layer for large distributed systems to use. It has in built mechanisms to handle machine outages, and is optimized for throughput rather than latency.
There are two and a half types of machine in a HDFS cluster:
- Datanode - where HDFS actually stores the data, there are usually quite a few of these.
- Namenode - the ‘master’ machine. It controls all the meta data for the cluster. Eg - what blocks make up a file, and what datanodes those blocks are stored on.
- Secondary Namenode - this is NOT a backup namenode, but is a separate service that keeps a copy of both the edit logs, and filesystem image, merging them periodically to keep the size reasonable. * this is soon being deprecated in favor of the backup node and the checkpoint node, but the functionality remains similar (if not the same)

Data can be accessed using either the Java API, or the Hadoop command line client. Many operations are similar to their Unix counterparts. Check out the documentation page for the full list, but here are some simple examples:
list files in the root directory
list files in my home directory
cat a file (decompressing if needed)
hadoop fs -text ./file.txt.gz
upload and retrieve a file
hadoop fs -put ./localfile.txt /home/matthew/remotefile.txt
hadoop fs -get /home/matthew/remotefile.txt ./local/file/path/file.txt
Note that HDFS is optimized differently than a regular file system. It is designed for non-realtime applications demanding high throughput instead of online applications demanding low latency. For example, files cannot be modified once written, and the latency of reads/writes is really bad by filesystem standards. On the flip side, throughput scales fairly linearly with the number of datanodes in a cluster, so it can handle workloads no single machine would ever be able to.
HDFS also has a bunch of unique features that make it ideal for distributed systems:
- Failure tolerant - data can be duplicated across multiple datanodes to protect against machine failures. The industry standard seems to be a replication factor of 3 (everything is stored on three machines).
- Scalability - data transfers happen directly with the datanodes so your read/write capacity scales fairly well with the number of datanodes
- Space - need more disk space? Just add more datanodes and re-balance
- Industry standard - Lots of other distributed applications build on top of HDFS (HBase, Map-Reduce)
- Pairs well with MapReduce - As we shall learn
HDFS Resources
For more information about the design of HDFS, you should read through apache documentation page. In particular the streaming and data access section has some really simple and informative diagrams on how data read/writes actually happen.
MapReduce
The second fundamental part of Hadoop is the MapReduce layer. This is made up of two sub components:
- An API for writing MapReduce workflows in Java.
- A set of services for managing the execution of these workflows.
The Map and Reduce APIs
The basic premise is this:
- Map tasks perform a transformation.
- Reduce tasks perform an aggregation.
In scala, a simplified version of a MapReduce job might look like this:
def map(lineNumber: Long, sentance: String) = {
val words = sentance.split()
words.foreach{word =>
output(word, 1)
}
}
def reduce(word: String, counts: Iterable[Long]) = {
var total = 0l
counts.foreach{count =>
total += count
}
output(word, total)
}
Notice that the output to a map and reduce task is always a KEY, VALUE pair. You always output exactly one key, and one value. The input to a reduce is KEY, ITERABLE[VALUE]. Reduce is called exactly once for each key output by the map phase. The ITERABLE[VALUE] is the set of all values output by the map phase for that key.
So if you had map tasks that output
map1: key: foo, value: 1
map2: key: foo, value: 32
Your reducer would receive:
key: foo, values: [1, 32]
Counter intuitively, one of the most important parts of a MapReduce job is what happens between map and reduce, there are 3 other stages; Partitioning, Sorting, and Grouping. In the default configuration, the goal of these intermediate steps is to ensure this behavior; that the values for each key are grouped together ready for the reduce() function. APIs are also provided if you want to tweak how these stages work (like if you want to perform a secondary sort).
Here’s a diagram of the full workflow to try and demonstrate how these pieces all fit together, but really at this stage it’s more important to understand how map and reduce interact rather than understanding all the specifics of how that is implemented.

What’s really powerful about this API is that there is no dependency between any two of the same task. To do it’s job a map() task does not need to know about other map task, and similarly a single reduce() task has all the context it needs to aggregate for any particular key, it does not share any state with other reduce tasks.
Taken as a whole, this design means that the stages of the pipeline can be easily distributed to an arbitrary number of machines. Workflows requiring massive datasets can be easily distributed across hundreds of machines because there are no inherent dependencies between the tasks requiring them to be on the same machine.
MapReduce API Resources
If you want to learn more about MapReduce (generally, and within Hadoop) I recommend you read the Google MapReduce paper, the Apache MapReduce documentation, or maybe even the hadoop book. Performing a web search for MapReduce tutorials also offers a lot of useful information.
To make things more interesting, many projects have been built on top of the MapReduce API to ease the development of MapReduce workflows. For example Hive lets you write SQL to query data on HDFS instead of Java. There are many more examples, so if you’re interested in learning more about these frameworks, I’ve written a separate article about the most common ones.
The Hadoop Services for Executing MapReduce Jobs
Hadoop MapReduce comes with two primary services for scheduling and running MapReduce jobs. They are the Job Tracker (JT) and the Task Tracker (TT). Broadly speaking the JT is the master and is in charge of allocating tasks to task trackers and scheduling these tasks globally. A TT is in charge of running the Map and Reduce tasks themselves.
When running, each TT registers itself with the JT and reports the number of ‘map’ and ‘reduce’ slots it has available, the JT keeps a central registry of these across all TTs and allocates them to jobs as required. When a task is completed, the TT re-registers that slot with the JT and the process repeats.
Many things can go wrong in a big distributed system, so these services have some clever tricks to ensure that your job finishes successfully:
- Automatic retries - if a task fails, it is retried N times (usually 3) on different task trackers.
- Data locality optimizations - if you co-locate a TT with a HDFS Datanode (which you should) it will take advantage of data locality to make reading the data faster
- Blacklisting a bad TT - if the JT detects that a TT has too many failed tasks, it will blacklist it. No tasks will then be scheduled on this task tracker.
- Speculative Execution - the JT can schedule the same task to run on several machines at the same time, just in case some machines are slower than others. When one version finishes, the others are killed.
Here’s a simple diagram of a typical deployment with TTs deployed alongside datanodes. 
MapReduce Service Resources
For more reading on the JobTracker and TaskTracker check out Wikipedia or the Hadoop book. I find the apache documentation pretty confusing when just trying to understand these things at a high level, so again doing a web-search can be pretty useful.
Wrap Up
I hope this introduction to Hadoop was useful. There is a lot of information on-line, but I didn’t feel like anything described Hadoop at a high-level for beginners.
The Hadoop project is a good deal more complex and deep than I have represented and is changing rapidly. For example, an initiative called MapReduce 2.0 provides a more general purpose job scheduling and resource management layer called YARN, and there is an ever growing range of non-MapReduce applications that run on top of HDFS, such as Cloudera Impala.
If you breezed through this article and the related readings and you’re ready and eager to write some MapReduce jobs, you might want to check out my article on MapReduce frameworks. I also plan on creating a second article on how to start working with Hadoop as a beginner, when I do I will update this article with a link.
Please get in touch in the comments or on twitter with any questions!
April 07 2013 in hadoop
This article is part of my guide to map reduce frameworks, in which I implement a solution to a real-world problem in each of the most popular Hadoop frameworks.
The Problem
Let me quickly restate the problem from my original article.
I have two datasets:
- User information (id, email, language, location)
- Transaction information (transaction-id, product-id, user-id, purchase-amount, item-description)
Given these datasets, I want to find the number of unique locations in which each product has been sold. To do that, I need to join the two datasets together.
I previously implemented a solution to this problem using the standard map-reduce api in Java, and Hive. It took ~500 lines of code in Java, and many many hours of my spare time.
The Pig Solution
Apache Pig is a framework for analyzing datasets that consists of a high-level scripting language (called Pig Latin), and infrastructure for compiling these scripts into a graph of Map Reduce jobs. Pig Latin is designed specifically for parallel data manipulation, is pretty easy to learn, and is extensible through a set of fairly simple APIs.
The Script
Unlike with Hive, Pig does not require me to set up a centralized metastore of data sources and formats. Instead I specify the directory and the format in a LOAD statement unique to each script.
USERS = load 'data/users' using PigStorage('\t') as (id:int, email:chararray, language:chararray, location:chararray);
TRANSACTIONS = load 'data/transactions' using PigStorage('\t') as (id:int, product:int, user:int, purchase_amount:double, description:chararray);
A = JOIN TRANSACTIONS by user LEFT OUTER, USERS by id;
B = GROUP A by product;
C = FOREACH B {
LOCS = DISTINCT A.location;
GENERATE group, COUNT(LOCS) as location_count;
};
DUMP C;
Notice how (like a real programming languages) a Pig script is a sequence of individual statements, instead of a single giant function (like with SQL).
The GROUP BY leading into the second FOREACH is the most confusing part of the script, so let me explain that a little.
After I generate dataset C using GROUP. I end up with a dataset that looks like this:
group:product, A: tuple(id, email, location, etc)
The field that you group on is renamed to ‘group’, and the remaining fields are part of a struct, named A (which is the name of the parent dataset before the group).
The nice thing about pig is that you can inspect the format of a dataset in between any statement:
grunt> DESCRIBE B;
B: {group: int,A: {(TRANSACTIONS::id: int,TRANSACTIONS::product: int,TRANSACTIONS::user: int,TRANSACTIONS::purchase_amount: double,TRANSACTIONS::description: chararray,USERS::id: int,USERS::email: chararray,USERS::language: chararray,USERS::location: chararray)}}
The script then iterates over every grouped piece of data and finds a count of all distinct locations.
Note that no map-reduce jobs get executed until the script calls DUMP. To write the results to a file, we call STORE. For example to store the data in tab delimited format in a file called ‘results’:
STORE C INTO 'results' USING PigStorage('\t');
Testing
In theory it is pretty trivial to test a pig script from Java in JUnit using Pig Unit. However, in reality pigunit is limited.
Most strikingly, if you want to define your test data within your test, pigunit only allows you to test scripts with a single input, and a single output, which is somewhat unrealistic.
To run more complex scripts it is necessary to parameterize your scripts, then pass in the source paths as arguments.
Finally, for whatever reason, Apache does not publish a pig-unit jar to any public maven repository, but thankfully, Cloudera publish the PigUnit jar to their repository as part of their CDH releases, as we’re using this repository already we don’t really need to change anything. Here it is for CDH4
Both the pig script, and a JUnit test covering the script can be found in my code example repository. I had to modify to query to take arguments for input / output files:
USERS = load '$USERS' using PigStorage('\t')
as (id:int, email:chararray, language:chararray, location:chararray);
TRANSACTIONS = load '$TRANSACTIONS' using PigStorage('\t')
as (id:int, product:int, user:int, purchase_amount:double, description:chararray);
A = JOIN TRANSACTIONS by user LEFT OUTER, USERS by id;
B = GROUP A by product;
C = FOREACH B {
LOCS = DISTINCT A.location;
GENERATE group, COUNT(LOCS) as location_count;
};
STORE C INTO '$OUTPUT';
Here is the meat of the test code (full code available here):
public class ScriptTest {
// THIS DOES NOT WORK
@Test
public void test() throws Exception {
// output is ignored by pigtest
String[] args = new String[]{
"OUTPUT=foo",
"USERS=src/test/resources/users.txt",
"TRANSACTIONS=src/test/resources/transactions.txt"
};
String script = "src/main/pig/script.pig";
PigTest test = new PigTest(script, args);
String[] expectedOutput = {"(1,3)", "(2,1)"};
test.assertOutput("C", expectedOutput);
}
}
The alternative to parameterizing your pig script is to try and extend the functionality of pigunit to test more than one input programatically. Clint Miller has some code examples on slideshare for this, but I was unable to get it working when I tried.
Thoughts
Pig sits somewhere between SQL and Java in complexity and flexibility. You can write queries fairly quickly, but can also deploy and test them a little easier. It’s really quick and easy to write a script, and importantly it is mostly simple to test.
I’ve also been working through the APIs for creating custom pig functions and loaders, and while it’s not always exactly simple, it is a much easier task than creating Hive functions, so your options for extensibility are pretty good. Twitter has a set of libraries in their Elephant Bird project that can make this even easier.
Pig Resources
February 27 2013 in personal
Version 2.0 of Simple Check-in is now in the Google Play store.
This release adds a long-press function to venues that allows you to add a shout/comment to your check-in, and optionally choose to share your check-in with facebook and twitter.
You should see the application update trickle through in the next few hours.
Here is a screenshot to prove it:

February 20 2013 in hadoop
This article is part of my guide to map reduce frameworks, in which I implement a solution to a real-world problem in each of the most popular Hadoop frameworks.
The Problem
Let me quickly restate the problem from my original article.
I have two datasets:
- User information (id, email, language, location)
- Transaction information (transaction-id, product-id, user-id, purchase-amount, item-description)
Given these datasets, I want to find the number of unique locations in which each product has been sold. To do that, I need to join the two datasets together.
I previously implemented a solution to this problem using the standard map-reduce api in java. It took ~500 lines of code, and many many hours of my spare time.
The Hive Solution
Hive is a tool which translates SQL queries into sequences of map-reduce jobs. You interact with it much the same way you interact with a traditional database - it has a command-line shell for interactive querying, and if you deploy the included thrift server you can interact with it from any programming language or even use the JDBC drivers.
Setup
Before querying our datasets, we have to create table schemas that represent the data. Like a regular database you cannot query data until you define a table. Unlike a regular database, hive tables can point to data that already exists (external tables).
Here are our table definitions. You can see that we’re specifying how the data is delimited (ROW FORMAT), and where the data is stored (LOCATION). In a real Hadoop deployment, the LOCATION is a directory on HDFS, when running a ‘local’ cluster it is simply a file path.
Note the LOCATION paths must be absolute.
Users Table
CREATE EXTERNAL TABLE users(
id INT,
email STRING,
language STRING,
loc STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LOCATION '/data/users';
Transactions Table
CREATE EXTERNAL TABLE transactions(
id INT,
productId INT,
userId INT,
purchaseAmount INT,
itemDescription STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LOCATION '/data/transactions';
The Query
Now our tables are defined, we can get on with the meat of the problem:
SELECT
productId,
count(distinct loc)
FROM
transactions t
LEFT OUTER JOIN
users u on t.userId = u.id
GROUP BY productId;
That’s it, 8 lines of SQL, and I’m pretty liberal with my newlines. Compared to ~500 lines of Java, this is a big improvement.
Below are the results of the query, I ran it on the same data used in the Java example. Both the query, and the setup script can be seen on github. The results are consistent with the Java example’s results. product ‘1’ was sold in 3 locations, product 2 was only sold in one location.
OK
1 3
2 1
Time taken: 7.076 seconds
More importantly, we can change our query pretty easily. Lets say we want the product description rather than the product ID, changing the query is trivial:
SELECT
itemDescription,
count(distinct loc)
FROM
transactions t
LEFT OUTER JOIN
users u on t.userId = u.id
GROUP BY itemDescription;
Making this change would be time consuming (to say the least) in our java example. You’d probably end up generalizing the whole thing and building your own query language.
Such comments might be a little unfair to Java. Hive is clearly well-suited to this use case, yet would be cumbersome in situations more suited to a programming framework.
Thoughts
- The amount of code required for Hive is much less than the code required for Java.
- Although the language is more concise for data querying, you can’t do anything complex like make HTTP queries, or update other systems.
- To query a dataset you have to declare a table. If you have a lot of datasets, you have a lot of tables.
- Hive queries are harder to test than simply using a unit-testing framework.
February 14 2013 in hadoop

Inspired by the devops and ruby weekly newsletters, Hadoop Weekly is an email-newsletter designed to provide “a summary of the week’s top news in the Apache Hadoop™ ecosystem”. The newsletter is curated by Joe Crobak, a long-time hadooper currently working at foursquare.
Each issue includes 10+ links along with a (very) useful summary of the article linked-to. The summary means that simply skimming this week’s issue might be enough to keep you in the know without having to read everything in full.
At the time of writing, the newsletter has 4 issues and has already attracted a several hundred subscribers. I myself have found it invaluable for catching up on the top hadoop stories of the moment.
Check it out, or subscribe to the rss feed if you don’t like emails.