title: Hadoop MapReduce Tutorial - Left Outer Join description: I walk through using Hadoop MapReduce to join two datasets together and rank the results. I talk through the concepts involved and provide the full working code for you to use. subject: hadoop layout: post tags:
This article is part of my guide to map reduce frameworks, in which I implement a solution to a real-world problem in each of the most popular hadoop frameworks.
If you’re impatient, you can find the code for the map-reduce implementation on my github, otherwise, read on!
Let me quickly restate the problem from my original article.
I have two datasets:
Given these datasets, I want to find the number of unique locations in which each product has been sold.
Here’s one way to do this using Scala:
case class User(id: Int, email: String, lang: String, location: String)
case class Transaction(id: Int, productId: Int, userId: Int, amount: Double, desc: String)
val users: List[User] = getUsers()
val transactions: List[Transaction] = getTransactions()
// make a map for O(1) location lookups from userId
// we're assuming all entries are unique
val userToLocationMap = users.map{u => (u.id, u.location)}.toMap
// if no location is found, we're using 'NA'
// end up with a list of Tuple(productId, location)
val joined = transactions.map{t => (t.productId, userToLocationMap.getOrElse(t.userId, "NA"))}
// get a list of distinct locations seen for each product id
// eg: "product1" -> List("US", "GB", ... )
val grouped: Map[Int, List[String]] = joined.groupBy(_._1).mapValues{vs => vs.map(_._2).distinct }
grouped.foreach{ case(product, locations) =>
println("%s: %d".format(product, locations.size))
}
First off, the problem requires that we write a two stage map-reduce:
We’re basically building a left outer join with map reduce.
K = userId
, and V = productId
K = userId
, and V = location
K = productId
, and V = location
K = productId
and V = location
K = productId
, and V = # distinct locations
While this sounds fairly simple, when it comes to implementation, there are a few problems with this workflow.
Basically as described above. In stage 1 both map tasks key their outputs by userId. It is the reducer’s job to match a user to his transactions.
However, the order in which values arrive to the reduce() method is unspecified, because sorting in hadoop is performed on a key-by-key basis, and all keys for a particular user are identical (they’re all the same userId).
Given this, for the reducer in stage one to join the two datasets together it will have to read all values into memory, find the 1 value containing user location, then emit the remaining values along with it.
Doing reduce-side computation in this way defeats many benefits of using map reduce, as a non-trivial proportion of the dataset must fit into memory. The scalability of this naive algorithm hinges on having no single user with greater than N transactions, where N = (#-records * size of record) / available heap-space
.
Stage 2 has similar issues. The reducer will receive a list of locations for each particular product, but if those locations are not sorted, it will need to maintain an in-memory data structure to filter out duplicates, and to create accurate results.
We do not want our reducer to scan through all values in order to find a location record. The easiest way to avoid this is to ensure that the first value in the iterator is the user’s location.
Reflecting on the design of map reduce, remember that between map and reduce, three other things happen:
Using these components in coordination with a composite-key, consisting of a primary, and a secondary key, we can perform partitioning and grouping on the primary key, yet be able to sort by the secondary key. This is called a secondary sort.
First we need a class to represent the composite key. In stage-1 this key would contain:
I called this class TextTuple, it contains two values, left and right, both of type Text.
Our secondary sort is simple, as we just want user records to appear before transactions. A simple solution for stage-1 is to set the user-record secondary key to “a”, and the transaction-record secondary key to “b”. a comes before b, so user records will appear first.
my implementation for partitioning, grouping and sorting looks something like this:
public class SecondarySort {
// Partition only by UID
public static class SSPartitioner extends Partitioner<TextTuple, Object> {
@Override
public int getPartition(TextTuple k, Object value, int partitions) {
return (k.left.hashCode() & Integer.MAX_VALUE) % partitions;
}
}
// Group only by UID
public static class SSGroupComparator extends TTRawComparator {
@Override
public int compare(TextTuple first, TextTuple second) {
return first.left.compareTo(second.left);
}
}
// But sort by UID and the sortCharacter
// remember location has a sort character of 'a'
// and product-id has a sort character of 'b'
// so the first record will be the location record!
public static class SSSortComparator extends TTRawComparator {
@Override
public int compare(TextTuple first, TextTuple second) {
int lCompare = first.left.compareTo(second.left);
if (lCompare == 0) return first.right.compareTo(second.right);
else return lCompare;
}
}
}
Now I can be sure that
With these assertions, I can implement my minimal state reducer:
// the first value is location
// if it's not, we don't have a user record, so we'll
// record the location as UNKNOWN
public class JoinReducer extends Reducer<TextTuple, TextTuple, Text, Text> {
Text location = new Text("UNKNOWN");
@Override
public void reduce(TextTuple key, Iterable<TextTuple> values, Context context)
throws java.io.IOException, InterruptedException {
for (TextTuple value: values) {
if (value.left.toString().equals("location")) {
location = new Text(value.right);
continue;
}
Text productId = value.right;
context.write(productId, location);
}
}
}
The same logic applies to stage-2. If my primary key is productId, and my secondary key is location, I can group records by productId, then sort by location. This again ensures that my reducer does not need to maintain state.
public static class SecondReducer extends Reducer<TextTuple, Text, Text, LongWritable> {
LongWritable valueOut = new LongWritable();
@Override
public void reduce(TextTuple product, Iterable<Text> locations, Context context)
throws java.io.IOException, InterruptedException {
String previous = null;
long totalLocations = 0;
for (Text location: locations) {
if (previous == null || !location.toString().equals(previous)) {
totalLocations += 1;
previous = location.toString();
}
}
valueOut.set(totalLocations);
context.write(product.left, valueOut);
}
}
The full implementation is available on github.
I cannot stress enough the importance of testing your map reduce jobs, but that is a separate topic in itself. In the mean time, I urge you to check out the accompanying integration test.
If you’re interested in seeing how to solve the same problem using other map reduce frameworks, check back with my guide to map-reduce frameworks over the coming weeks. My goal is to create an example for every framework listed.