Real World Hadoop - Implementing a Left Outer Join in Map Reduce
This article is part of my guide to map reduce frameworks, in which I implement a solution to a real-world problem in each of the most popular hadoop frameworks.
If you’re impatient, you can find the code for the map-reduce implementation on my github, otherwise, read on!
Let me quickly restate the problem from my original article.
I have two datasets:
- User information (id, email, language, location)
- Transaction information (transaction-id, product-id, user-id, purchase-amount, item-description)
Given these datasets, I want to find the number of unique locations in which each product has been sold.
A Single Threaded Solution
- For each transaction, look up the user record for the transaction’s userId
- Join the user records to each transaction
- Create a mapping from productId to a list of locations
- Count the number of distinct locations per productId.
Here’s one way to do this using Scala:
The Map Reduce Solution
First off, the problem requires that we write a two stage map-reduce:
- Join users onto transactions and emit a set of product-location pairs (one for each transaction)
- For each product sold, count the # of distinct locations it was sold in
We’re basically building a left outer join with map reduce.
- transaction map task outputs (K,V) with
K = userId, and
V = productId
- user map tasks outputs (K,V) with
K = userId, and
V = location
- reducer gets both user location and productId thus outputs (K,V) with
K = productId, and
V = location
- map task is an identity mapper, outputs (K,V) with
K = productIdand
V = location
- reducer counts the number of unique locations that it sees per productId, outputs (K,V),
K = productId, and
V = # distinct locations
While this sounds fairly simple, when it comes to implementation, there are a few problems with this workflow.
The Naive Solution
Basically as described above. In stage 1 both map tasks key their outputs by userId. It is the reducer’s job to match a user to his transactions.
However, the order in which values arrive to the reduce() method is unspecified, because sorting in hadoop is performed on a key-by-key basis, and all keys for a particular user are identical (they’re all the same userId).
Given this, for the reducer in stage one to join the two datasets together it will have to read all values into memory, find the 1 value containing user location, then emit the remaining values along with it.
Doing reduce-side computation in this way defeats many benefits of using map reduce, as a non-trivial proportion of the dataset must fit into memory. The scalability of this naive algorithm hinges on having no single user with greater than N transactions, where
N = (#-records * size of record) / available heap-space.
Stage 2 has similar issues. The reducer will receive a list of locations for each particular product, but if those locations are not sorted, it will need to maintain an in-memory data structure to filter out duplicates, and to create accurate results.
The Scalable Solution
We do not want our reducer to scan through all values in order to find a location record. The easiest way to avoid this is to ensure that the first value in the iterator is the user’s location.
Reflecting on the design of map reduce, remember that between map and reduce, three other things happen:
- Partitioning – this defines which reducer partition the record is assigned to
- Sorting – the order of the records in the partition, based on key.
- Grouping – whether record N+1 accompanies record N in the same call to the reduce() function. Again based on key.
Using these components in coordination with a composite-key, consisting of a primary, and a secondary key, we can perform partitioning and grouping on the primary key, yet be able to sort by the secondary key. This is called a secondary sort.
First we need a class to represent the composite key. In stage-1 this key would contain:
1. The primary key: userId (for partitioning, and grouping)
2. The secondary key: a single character (for sorting)
I called this class TextTuple, it contains two values, left and right, both of type Text.
Our secondary sort is simple, as we just want user records to appear before transactions. A simple solution for stage-1 is to set the user-record secondary key to “a”, and the transaction-record secondary key to “b”. a comes before b, so user records will appear first.
my implementation for partitioning, grouping and sorting looks something like this:
Now I can be sure that
* all keys with the same UserID are sent to the same partition
* all keys with the same userId are sent to the same call to reduce()
* user records will preceded transaction records.
* Furthermore, as user records are unique, only the first value in the iterator contains a location, all other records are transactions.
With these assertions, I can implement my minimal state reducer:
The same logic applies to stage-2. If my primary key is productId, and my secondary key is location, I can group records by productId, then sort by location. This again ensures that my reducer does not need to maintain state.
The full implementation is available on github.
I cannot stress enough the importance of testing your map reduce jobs, but that is a separate topic in itself. In the mean time, I urge you to check out the accompanying integration test.
- The amount of boiler plate code required to chain two map reduce jobs together is staggering. CLOC reports that I’ve written 400 lines of code to implement this.
- How you implement a left outer join is unintuitive, as are some of the api details ( eg sorting an grouping can only operate on keys, not values)
- Honestly whenever I have to join datasets in real life I go straight to higher level frameworks.
If you’re interested in seeing how to solve the same problem using other map reduce frameworks, check back with my guide to map-reduce frameworks over the coming weeks. My goal is to create an example for every framework listed.