Real World Hadoop - Implementing a Left Outer Join in Pig
This article is part of my guide to map reduce frameworks, in which I implement a solution to a real-world problem in each of the most popular Hadoop frameworks.
Let me quickly restate the problem from my original article.
I have two datasets:
- User information (id, email, language, location)
- Transaction information (transaction-id, product-id, user-id, purchase-amount, item-description)
Given these datasets, I want to find the number of unique locations in which each product has been sold. To do that, I need to join the two datasets together.
The Pig Solution
Apache Pig is a framework for analyzing datasets that consists of a high-level scripting language (called Pig Latin), and infrastructure for compiling these scripts into a graph of Map Reduce jobs. Pig Latin is designed specifically for parallel data manipulation, is pretty easy to learn, and is extensible through a set of fairly simple APIs.
Unlike with Hive, Pig does not require me to set up a centralized metastore of data sources and formats. Instead I specify the directory and the format in a
LOAD statement unique to each script.
Notice how (like a real programming languages) a Pig script is a sequence of individual statements, instead of a single giant function (like with SQL).
GROUP BY leading into the second
FOREACH is the most confusing part of the script, so let me explain that a little.
After I generate dataset C using
GROUP. I end up with a dataset that looks like this:
group:product, A: tuple(id, email, location, etc)
The field that you group on is renamed to ‘group’, and the remaining fields are part of a struct, named A (which is the name of the parent dataset before the group).
The nice thing about pig is that you can inspect the format of a dataset in between any statement:
The script then iterates over every grouped piece of data and finds a count of all distinct locations.
Note that no map-reduce jobs get executed until the script calls
DUMP. To write the results to a file, we call
STORE. For example to store the data in tab delimited format in a file called ‘results’:
STORE C INTO 'results' USING PigStorage('\t');
In theory it is pretty trivial to test a pig script from Java in JUnit using Pig Unit. However, in reality pigunit is limited.
Most strikingly, if you want to define your test data within your test, pigunit only allows you to test scripts with a single input, and a single output, which is somewhat unrealistic.
To run more complex scripts it is necessary to parameterize your scripts, then pass in the source paths as arguments.
Finally, for whatever reason, Apache does not publish a pig-unit jar to any public maven repository, but thankfully, Cloudera publish the PigUnit jar to their repository as part of their CDH releases, as we’re using this repository already we don’t really need to change anything. Here it is for CDH4
Both the pig script, and a JUnit test covering the script can be found in my code example repository. I had to modify to query to take arguments for input / output files:
Here is the meat of the test code (full code available here):
The alternative to parameterizing your pig script is to try and extend the functionality of pigunit to test more than one input programatically. Clint Miller has some code examples on slideshare for this, but I was unable to get it working when I tried.
Pig sits somewhere between SQL and Java in complexity and flexibility. You can write queries fairly quickly, but can also deploy and test them a little easier. It’s really quick and easy to write a script, and importantly it is mostly simple to test.
I’ve also been working through the APIs for creating custom pig functions and loaders, and while it’s not always exactly simple, it is a much easier task than creating Hive functions, so your options for extensibility are pretty good. Twitter has a set of libraries in their Elephant Bird project that can make this even easier.