Real World Hadoop - Implementing a Left Outer Join in Pig

Learning Hadoop and Spark?

I've scoured the internet and I think this free Big Data course from UC San Diego is a great way to jump in. It's hosted on Coursera, so you can audit the course for free.

This article is part of my guide to map reduce frameworks, in which I implement a solution to a real-world problem in each of the most popular Hadoop frameworks.

The Problem

Let me quickly restate the problem from my original article.

I have two datasets:

  1. User information (id, email, language, location)
  2. Transaction information (transaction-id, product-id, user-id, purchase-amount, item-description)

Given these datasets, I want to find the number of unique locations in which each product has been sold. To do that, I need to join the two datasets together.

I previously implemented a solution to this problem using the standard map-reduce api in Java, and Hive. It took ~500 lines of code in Java, and many many hours of my spare time.

The Pig Solution

Apache Pig is a framework for analyzing datasets that consists of a high-level scripting language (called Pig Latin), and infrastructure for compiling these scripts into a graph of Map Reduce jobs. Pig Latin is designed specifically for parallel data manipulation, is pretty easy to learn, and is extensible through a set of fairly simple APIs.

The Script

Unlike with Hive, Pig does not require me to set up a centralized metastore of data sources and formats. Instead I specify the directory and the format in a LOAD statement unique to each script.

USERS = load 'data/users' using PigStorage('\t') as (id:int, email:chararray, language:chararray, location:chararray);

TRANSACTIONS = load 'data/transactions' using PigStorage('\t') as (id:int, product:int, user:int, purchase_amount:double, description:chararray);

A = JOIN TRANSACTIONS by user LEFT OUTER, USERS by id;

B = GROUP A by product;

C = FOREACH B {
  LOCS = DISTINCT A.location;
  GENERATE group, COUNT(LOCS) as location_count;
};

DUMP C;

Notice how (like a real programming languages) a Pig script is a sequence of individual statements, instead of a single giant function (like with SQL).

The GROUP BY leading into the second FOREACH is the most confusing part of the script, so let me explain that a little.

After I generate dataset C using GROUP. I end up with a dataset that looks like this:

group:product, A: tuple(id, email, location, etc)

The field that you group on is renamed to ‘group’, and the remaining fields are part of a struct, named A (which is the name of the parent dataset before the group).

The nice thing about pig is that you can inspect the format of a dataset in between any statement:

grunt> DESCRIBE B;

B: {group: int,A: {(TRANSACTIONS::id: int,TRANSACTIONS::product: int,TRANSACTIONS::user: int,TRANSACTIONS::purchase_amount: double,TRANSACTIONS::description: chararray,USERS::id: int,USERS::email: chararray,USERS::language: chararray,USERS::location: chararray)}}

The script then iterates over every grouped piece of data and finds a count of all distinct locations.

Note that no map-reduce jobs get executed until the script calls DUMP. To write the results to a file, we call STORE. For example to store the data in tab delimited format in a file called ‘results’:

STORE C INTO 'results' USING PigStorage('\t');

Testing

In theory it is pretty trivial to test a pig script from Java in JUnit using Pig Unit. However, in reality pigunit is limited.

Most strikingly, if you want to define your test data within your test, pigunit only allows you to test scripts with a single input, and a single output, which is somewhat unrealistic.

To run more complex scripts it is necessary to parameterize your scripts, then pass in the source paths as arguments.

Finally, for whatever reason, Apache does not publish a pig-unit jar to any public maven repository, but thankfully, Cloudera publish the PigUnit jar to their repository as part of their CDH releases, as we’re using this repository already we don’t really need to change anything. Here it is for CDH4

Both the pig script, and a JUnit test covering the script can be found in my code example repository. I had to modify to query to take arguments for input / output files:

USERS = load '$USERS' using PigStorage('\t') 
  as (id:int, email:chararray, language:chararray, location:chararray);

TRANSACTIONS = load '$TRANSACTIONS' using PigStorage('\t') 
  as (id:int, product:int, user:int, purchase_amount:double, description:chararray);

A = JOIN TRANSACTIONS by user LEFT OUTER, USERS by id;

B = GROUP A by product;

C = FOREACH B {
  LOCS = DISTINCT A.location;
  GENERATE group, COUNT(LOCS) as location_count;
};

STORE C INTO '$OUTPUT';

Here is the meat of the test code (full code available here):

public class ScriptTest {
  // THIS DOES NOT WORK
  @Test
  public void test() throws Exception {
    // output is ignored by pigtest
    String[] args = new String[]{
      "OUTPUT=foo", 
      "USERS=src/test/resources/users.txt",
      "TRANSACTIONS=src/test/resources/transactions.txt"
    };
    String script = "src/main/pig/script.pig";
    
    
    PigTest test = new PigTest(script, args);
    String[] expectedOutput = {"(1,3)", "(2,1)"};
    test.assertOutput("C", expectedOutput);
  }
}

The alternative to parameterizing your pig script is to try and extend the functionality of pigunit to test more than one input programatically. Clint Miller has some code examples on slideshare for this, but I was unable to get it working when I tried.

Thoughts

Pig sits somewhere between SQL and Java in complexity and flexibility. You can write queries fairly quickly, but can also deploy and test them a little easier. It’s really quick and easy to write a script, and importantly it is mostly simple to test.

I’ve also been working through the APIs for creating custom pig functions and loaders, and while it’s not always exactly simple, it is a much easier task than creating Hive functions, so your options for extensibility are pretty good. Twitter has a set of libraries in their Elephant Bird project that can make this even easier.

Pig Resources

Matthew Rathbone's Picture

Matthew Rathbone

CEO of Beekeeper Data. British. Data Nerd. Lucky husband and father. More about me

Need More Hadoop Reading?

I've collected a list of the top Hadoop books on the market

Join the discussion