title: Hadoop MapReduce Tutorial - Left Outer Join
description: I walk through using Hadoop MapReduce to join two datasets together and rank the results. I talk through the concepts involved and provide the full working code for you to use.
subject: hadoop
layout: post
tags:
map-reduce
secondary sort
hadoop
integration testing
hdfs
java
scala
This article is part of my guide to map reduce frameworks, in which I implement a solution to a real-world problem in each of the most popular hadoop frameworks.
If you’re impatient, you can find the code for the map-reduce implementation on my github, otherwise, read on!
transaction map task outputs (K,V) with K = userId, and V = productId
user map tasks outputs (K,V) with K = userId, and V = location
reducer gets both user location and productId thus outputs (K,V) with K = productId, and V = location
Stage 2
map task is an identity mapper, outputs (K,V) with K = productId and V = location
reducer counts the number of unique locations that it sees per productId, outputs (K,V), K = productId, and V = # distinct locations
While this sounds fairly simple, when it comes to implementation, there are a few problems with this workflow.
Implementation
The Naive Solution
Basically as described above. In stage 1 both map tasks key their outputs by userId. It is the reducer’s job to match a user to his transactions.
However, the order in which values arrive to the reduce() method is unspecified, because sorting in hadoop is performed on a key-by-key basis, and all keys for a particular user are identical (they’re all the same userId).
Given this, for the reducer in stage one to join the two datasets together it will have to read all values into memory, find the 1 value containing user location, then emit the remaining values along with it.
Doing reduce-side computation in this way defeats many benefits of using map reduce, as a non-trivial proportion of the dataset must fit into memory. The scalability of this naive algorithm hinges on having no single user with greater than N transactions, where N = (#-records * size of record) / available heap-space.
Stage 2 has similar issues. The reducer will receive a list of locations for each particular product, but if those locations are not sorted, it will need to maintain an in-memory data structure to filter out duplicates, and to create accurate results.
The Scalable Solution
We do not want our reducer to scan through all values in order to find a location record. The easiest way to avoid this is to ensure that the first value in the iterator is the user’s location.
Reflecting on the design of map reduce, remember that between map and reduce, three other things happen:
Partitioning – this defines which reducer partition the record is assigned to
Sorting – the order of the records in the partition, based on key.
Grouping – whether record N+1 accompanies record N in the same call to the reduce() function. Again based on key.
Using these components in coordination with a composite-key, consisting of a primary, and a secondary key, we can perform partitioning and grouping on the primary key, yet be able to sort by the secondary key. This is called a secondary sort.
First we need a class to represent the composite key. In stage-1 this key would contain:
The primary key: userId (for partitioning, and grouping)
The secondary key: a single character (for sorting)
I called this class TextTuple, it contains two values, left and right, both of type Text.
Our secondary sort is simple, as we just want user records to appear before transactions. A simple solution for stage-1 is to set the user-record secondary key to “a”, and the transaction-record secondary key to “b”. a comes before b, so user records will appear first.
all keys with the same UserID are sent to the same partition
all keys with the same userId are sent to the same call to reduce()
user records will preceded transaction records.
Furthermore, as user records are unique, only the first value in the iterator contains a location, all other records are transactions.
With these assertions, I can implement my minimal state reducer:
The same logic applies to stage-2. If my primary key is productId, and my secondary key is location, I can group records by productId, then sort by location. This again ensures that my reducer does not need to maintain state.
I cannot stress enough the importance of testing your map reduce jobs, but that is a separate topic in itself. In the mean time, I urge you to check out the accompanying integration test.
Thoughts
The amount of boiler plate code required to chain two map reduce jobs together is staggering. CLOC reports that I’ve written 400 lines of code to implement this.
How you implement a left outer join is unintuitive, as are some of the api details ( eg sorting an grouping can only operate on keys, not values)
Honestly whenever I have to join datasets in real life I go straight to higher level frameworks.
If you’re interested in seeing how to solve the same problem using other map reduce frameworks, check back with my guide to map-reduce frameworks over the coming weeks. My goal is to create an example for every framework listed.