Hadoop MapReduce Advanced Python Join Tutorial with Example Code
image by Archangel12
Hire me to supercharge your Hadoop and Spark projects
I help businesses improve their return on investment from big data projects. I do everything from software architecture to staff training. Learn More
This article is part of my guide to map reduce frameworks in which I implement a solution to a real-world problem in each of the most popular Hadoop frameworks.
One of the articles in the guide Hadoop Python MapReduce Tutorial for Beginners has already introduced the reader to the basics of hadoop-streaming with Python. This is the next logical step in a quest to learn how to use Python in map reduce framework defined by Hadoop.
The Problem
Let me quickly restate the problem from my original article.
I have two datasets:
- Users (id, email, language, location)
- Transactions (transaction-id, product-id, user-id, purchase-amount, item-description)
Given these datasets, I want to find the number of unique locations in which each product has been sold. To do that, I need to join the two datasets together.
Previously I have implemented this solution in java, with hive and with pig. The java solution was ~500 lines of code, hive and pig were like ~20 lines tops.
My beginners guide to python MapReduce does not solve this problem, but provides a more gentle introduction to running MapReduce with Python. Start there if you’re just getting started with these concepts.
The Python Solution
This solution assumes some preliminary understanding of hadoop-streaming and python, and uses concepts introduced in my earlier article.
Demonstration Data
As in previous articles (java MR, hive and pig) we use two datasets called users
and transactions
.
and
One big difference with Python MapReduce is that we treat them as a single dataset when we are writing our Mapper. I will show you how just below.
To start, let’s upload these files to HDFS.
Code
This job logically has two parts, so I will divide the code in the same way. Firstly we solve the problem of joining the two datasets to associate a location to each purchase, and secondly we use this joined dataset to evaluate how many unique locations each product has been sold in.
The code for the both parts of the solution and data used in this post can be found in my GitHub repository.
Part 1: Joining
Mapper:
Reducer:
The Mapper reads both datasets and distinguishes them by the number of fields in each row. Transaction records have 5 fields, users have only 4.
The mapper does two things:
- For transactions - Extract the
user_id
andproduct_id
- For users - Extract the
user_id
and thelocation
The mapper outputs three fields: user_id, product_id, location
.
The output will look something like this:
By using a feature of the streaming api we can tell Hadoop to treat BOTH of the first two fields as a combined key. This allows us to guarantee the order in which the reducer will recieve data:
- User record with location (now we can remember the location)
- Each user purchase in turn, ordered by product id.
We do this by specifying an option on the command line: -Dstream.num.map.output.key.fields=2
.
If we want to test this without Hadoop we can just use sort
.
The output will look like this (I added notes):
For each new user the Reducer will first remember that user’s location:
and then add this location to the transactions:
So the reducer will take an input that looks like this (user_id, product_id, location
):
Extract the location, and associate with each product id to produce this:
We can run the whole join pipeline easily without using Hadoop:
And get a list of product/location pairs for stage 2. This shows the location of the purchaser (user) for each transaction, where the key is the product ID. Products are repeated the number of times that it appeared in a transaction.
Stage 2: Counting Distinct Locations for each Product
Mapper:
In fact we can just use cat
here if we like.
Reducer:
Our mapper just echos it’s input and the bulk of work happens in the reducer. In the Reducer phase we again exploit the fact that entries are ordered by key. Notice that unlike regular MapReduce this reducer’s API does not distinguish between keys and receives all of them in a big long list, so our reducer has to do it’s own bookeeping. See my beginners article for more of an explaination.
So we go through the list and count the number of locations we see for each product_id, whilst making sure we transition between products properly.
Running the code
Again, this is easy to test without Hadoop:
Or using hadoop-streaming in two steps:
The result in both cases is correct:
Testing
Testing Hadoop streaming pipelines is harder than testing regular MapReduce pipelines because our tasks are just scripts. There are some more structured Python frameworks that help in both development and testing of Hadoop pipelines.
Thoughts
Our Python code is very legible and does not require as much boiler plate as regular Java MapReduce. That said it does require us to use some bookeeping to make sure the mappers and reducers work correctly. The ability to quickly test using the command line is very nice, as is the inclusion of a lot of functionality in the streaming API jar itself.
To be frank – I would avoid using python streaming to write MapReduce code for Hadoop. If you have to use Python I suggest investigating a python framework like Luigi or MRJob. Hadoop streaming is powerful, but without a framework there are lots of easy ways to make mistakes and it’s pretty hard to test.
Hadoop Streaming Resources
Documentation on Hadoop Streaming by Apache.
Check out the O’REILLY ‘Hadoop with Python’ Book by Donald Miner: OREILLY.