Type-Safe Scalding MapReduce Tutorial - Joining and Summarizing Data
image by Indi Samarajiva
Hire me to supercharge your Hadoop and Spark projects
I help businesses improve their return on investment from big data projects. I do everything from software architecture to staff training. Learn More
This article is part of my guide to map reduce frameworks in which I implement a solution to a real-world problem in each of the most popular Hadoop frameworks.
This article is my second covering Scalding and should be read together with the previous Scalding article on the ‘fields’ API.
I’ll talk through the differences in Scalding APIs, so some prior knowledge of Scalding is assumed. It might also be useful to refer to my Cascading tutorial as Scalding is built on Cascading and I make a few comparisons.
The Problem
Let me quickly restate the problem from my original article.
I have two datasets:
- User information (id, email, language, location)
- Transaction information (transaction-id, product-id, user-id, purchase-amount, item-description)
Given these datasets, I want to find the number of unique locations in which each product has been sold. To do that, I need to join the two datasets together.
Previously I have implemented this solution in java, with hive and with pig. The java solution was ~500 lines of code, hive and pig were like ~20 lines tops.
The Type Safe Scalding Solution
Scalding is a Scala API developed at Twitter for distributed data programming that uses the Cascading Java API, which in turn sits on top of Hadoop’s Java MapReduce API. There are several types of API for Scalding: The Type Safe API, Fields based API, and Matrix API. In my previous Scalding article I present the solution using the field-based API. This time we solve the same problem using Type Safe API and look at the core difference between the solutions.
In general, the type-safe API is the most popular version available as it allows use of scala features like case classes and tuples without too many changes.
Demonstration Data
The tables that will be used for demonstration are called users
and transactions
.
and
Our code will read and write data from/to HDFS. Before starting work with the code we have to copy the input data to HDFS. Unlike Cascading (and MapReduce in general), Scalding wants the output directory to be created prior to executing the job.
Code
All code and data used in this post can be found in my [Hadoop examples GitHub repository][github].
I’ve defined two case classes (User
and Transaction
) to represent the records in each dataset. You can see immediately that the fields of the classes have types specified, and do not include any Scalding specific special code.
Unfortunately, our data is stored in a delimited format, so to start working with it, we need to convert it to the right types:
We can then operate on the data by transforming collections of User
and Transaction
objects. While the solution is almost identical to my field API solution, this version looks much more like ‘regular scala’.
Some differences to regular Scala are worth noting – calling size
after groupBy
actually counts the list of values in each group, rather than counting the size of the group list itself. This is a little unintuitive as it is very different behavior compared to vanilla Scala.
Running the resulting jar
Testing
Testing is similar to that of the Field based solution. The main difference can be seen in the line
The field based API decided itself that our ids are Integers, but in the type-safe solution we defined this up front.
Differences between the Scalding APIs and their roots
There are three APIs for Scalding: Fields based API, Type Safe API, and Matrix API. Their introduction order is the order they are mentioned here. So the Matrix API is the newest.
One potential problem we might have using the Field API is that fields are not strongly typed, and must be infered at compile time. We name the fields, but we’re not defining our data structures in quite the same way we do in the typed API.
One reason for having different Scalding APIs is it’s close ties to Cascading. Scalding runs on top of Cascading so no only inherits a lot of it’s ideas, but develops alongside the Java framework.
This is evident when using the field-based API of Scalding. We can see it’s close resemblence to regular cascading.
For example, here is how we define the schema for a particular dataset in each:
vs
Data parsing also looks similar, although Scalding relies on a functional approach compared to Cascading’s object oriented ideas:
vs
Neither solution is type safe. If we look at Cascading’s documentation for TextDelimited class:
..if a field cannot be coerced into an expected type, a null will be used for the value.
But Cascading has another structure - a Tuple. See Tuple and TupleEntry.
Tuples work in tandem with Fields and the TupleEntry classes. A TupleEntry holds an instance of Fields and a Tuple. It allows a tuple to be accessed by its field names, and will help maintain consistent types if any are given on the Fields instance.
which more closely resembles Scalding’s typed system, which is sometimes referred to as tuple-based
Thoughts
As in my prior Scalding walkthrough the main code and the test code are really concise. The newer API does not make the code heavier, but rather makes it feel more like ‘regular Scala’. While either the typed or field based APIs are both totally usable, I would definitely choose Type Safe API for more complex tasks due to it’s compile-time safety and more native-scala feel.
In many ways the Typed API feels pretty similar to Spark, which is a good thing.
Scalding Resources
The Cascading web-site has resources related to both Cascading and Scalding.
Book:
- PACKT Publishing Programming MapReduce with Scalding: A practical guide to designing, testing, and implementing complex MapReduce applications in Scala, available from Amazon.