Apache Spark Java Tutorial [Code Walkthrough With Examples]

This article was co-authored by Elena Akhmatova

image by Jeremy Keith

Hire me to supercharge your Hadoop and Spark projects

I help businesses improve their return on investment from big data projects. I do everything from software architecture to staff training. Learn More

This article is part of my guide to map reduce frameworks in which I implement a solution to a real-world problem in each of the most popular Hadoop frameworks.

Spark is itself a general-purpose framework for cluster computing. It can be run, and is often run, on the Hadoop YARN. Thus it is often associated with Hadoop and so I have included it in my guide to map reduce frameworks as well. Spark is designed to be fast for interactive queries and iterative algorithms that Hadoop MapReduce can be slow with.

The Problem

Let me quickly restate the problem from my original article.

I have two datasets:

  1. User information (id, email, language, location)
  2. Transaction information (transaction-id, product-id, user-id, purchase-amount, item-description)

Given these datasets, I want to find the number of unique locations in which each product has been sold. To do that, I need to join the two datasets together.

Previously I have implemented this solution in java, with hive and with pig. The java solution was ~500 lines of code, hive and pig were like ~20 lines tops.

The Java Spark Solution

This article is a follow up for my earlier article on Spark that shows a Scala Spark solution to the problem. Even though Scala is the native and more popular Spark language, many enterprise-level projects are written in Java and so it is supported by the Spark stack with it’s own API.

This article partially repeats what was written in my Scala overview, although I emphasize the differences between Scala and Java implementations of logically same code.

As it was mentioned before, Spark is an open source project that has been built and is maintained by a thriving and diverse community of developers. It started in 2009 as a research project in the UC Berkeley RAD Labs. Its aim was to compensate for some Hadoop shortcomings. Spark brings us as interactive queries, better performance for iterative algorithms, as well as support for in-memory storage and efficient fault recovery.

It contains a number of different components, such as Spark Core, Spark SQL, Spark Streaming, MLlib, and GraphX. It runs over a variety of cluster managers, including Hadoop YARN, Apache Mesos, and a simple cluster manager included in Spark itself called the Standalone Scheduler. It is used for a diversity of tasks from data exploration through to streaming machine learning algorithms. As a technology stack it has really caught fire in recent years.

Demonstration Data

The tables that will be used for demonstration are called users and transactions.

users
1 matthew@test.com  EN  US
2 matthew@test2.com EN  GB
3 matthew@test3.com FR  FR

and

transactions
1 1 1 300 a jumper
2 1 2 300 a jumper
3 1 2 300 a jumper
4 2 3 100 a rubber chicken
5 1 3 300 a jumper

For this task we have used Spark on a Hadoop YARN cluster. Our code will read and write data from/to HDFS. Before starting work with the code we have to copy the input data to HDFS.

hdfs dfs -mkdir input

hdfs dfs -put ./users.txt input
hdfs dfs -put ./transactions.txt input

Code

All code and data used in this post can be found in my hadoop examples GitHub repository.

public class ExampleJob {
    private static JavaSparkContext sc;

    public ExampleJob(JavaSparkContext sc){
      this.sc = sc;
    }

    public static final PairFunction<Tuple2<Integer, Optional<String>>, Integer, String> KEY_VALUE_PAIRER =
    new PairFunction<Tuple2<Integer, Optional<String>>, Integer, String>() {
      public Tuple2<Integer, String> call(
          Tuple2<Integer, Optional<String>> a) throws Exception {
      // a._2.isPresent()
        return new Tuple2<Integer, String>(a._1, a._2.get());
      }
  };

  public static JavaRDD<Tuple2<Integer,Optional<String>>> joinData(JavaPairRDD<Integer, Integer> t, JavaPairRDD<Integer, String> u){
        JavaRDD<Tuple2<Integer,Optional<String>>> leftJoinOutput = t.leftOuterJoin(u).values().distinct();
        return leftJoinOutput;
  }

  public static JavaPairRDD<Integer, String> modifyData(JavaRDD<Tuple2<Integer,Optional<String>>> d){
    return d.mapToPair(KEY_VALUE_PAIRER);
  }

  public static Map<Integer, Object> countData(JavaPairRDD<Integer, String> d){
        Map<Integer, Object> result = d.countByKey();
        return result;
  }

  public static JavaPairRDD<String, String> run(String t, String u){
        JavaRDD<String> transactionInputFile = sc.textFile(t);
        JavaPairRDD<Integer, Integer> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, Integer, Integer>() {
            public Tuple2<Integer, Integer> call(String s) {
                String[] transactionSplit = s.split("\t");
                return new Tuple2<Integer, Integer>(Integer.valueOf(transactionSplit[2]), Integer.valueOf(transactionSplit[1]));
            }
        });

        JavaRDD<String> customerInputFile = sc.textFile(u);
        JavaPairRDD<Integer, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, Integer, String>() {
            public Tuple2<Integer, String> call(String s) {
                String[] customerSplit = s.split("\t");
                return new Tuple2<Integer, String>(Integer.valueOf(customerSplit[0]), customerSplit[3]);
            }
        });

        Map<Integer, Object> result = countData(modifyData(joinData(transactionPairs, customerPairs)));

        List<Tuple2<String, String>> output = new ArrayList<>();
      for (Entry<Integer, Object> entry : result.entrySet()){
        output.add(new Tuple2<>(entry.getKey().toString(), String.valueOf((long)entry.getValue())));
      }

      JavaPairRDD<String, String> output_rdd = sc.parallelizePairs(output);
      return output_rdd;
  }

    public static void main(String[] args) throws Exception {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJoins").setMaster("local"));
        ExampleJob job = new ExampleJob(sc);
        JavaPairRDD<String, String> output_rdd = job.run(args[0], args[1]);
        output_rdd.saveAsHadoopFile(args[2], String.class, String.class, TextOutputFormat.class);
        sc.close();
    }
}

This code does exactly the same thing that the corresponding code of the Scala solution does. The sequence of actions is exactly the same, as well as the input and output data on each step.

  1. read / transform transactions data
  2. read / transform users data
  3. left outer join of transactions on users
  4. get rid of user_id key from the result of the previous step by applying values()
  5. find distinct() values
  6. countByKey()
  7. transform result to an RDD
  8. save result to Hadoop

If this is confusing (it might be), read the Scala version first, it is way more compact.

As with Scala it is required to define a SparkContext first. Again, it is enough to set an app name and a location of a master node.

JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local"));

The resilient distributed dataset (RDD), Spark’s core abstraction for working with data, is named RDD as in Scala. As with any other Spark data-processing algorithm all our work is expressed as either creating new RDDs, transforming existing RDDs, or calling actions on RDDs to compute a result.

Spark’s Key/value RDDs are of JavaPairRDD type. Key/value RDDs are commonly used to perform aggregations, such as groupByKey(), and are useful for joins, such as leftOuterJoin(). Explicitly defining key and value elements allows spark to abstract away a lot of these complex operations (like joins), so they are very useful.

Here is how the input and intermediate data is transformed into a Key/value RDD in Java:

JavaRDD<String> transactionInputFile = sc.textFile(t);
JavaPairRDD<Integer, Integer> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, Integer, Integer>() {
    public Tuple2<Integer, Integer> call(String s) {
        String[] transactionSplit = s.split("\t");
        return new Tuple2<Integer, Integer>(Integer.valueOf(transactionSplit[2]), Integer.valueOf(transactionSplit[1]));
    }
});

and a stand-alone function

public static final PairFunction<Tuple2<Integer, Optional<String>>, Integer, String> KEY_VALUE_PAIRER =
new PairFunction<Tuple2<Integer, Optional<String>>, Integer, String>() {
    public Tuple2<Integer, String> call(
        Tuple2<Integer, Optional<String>> a) throws Exception {
      // a._2.isPresent()
    return new Tuple2<Integer, String>(a._1, a._2.get());
}
};

Reading input data is done in exactly same manner as in Scala. Note that the explicit KEY_VALUE_PAIRER transformation is not needed in Scala, but in Java there seems to be no way to skip it.

Spark has added an Optional class for Java (similar to Scala’s Option) to box values and avoid nulls. There is a special function isPresent() in the Optional class that allows to check whether the value is present, that is it is not null. Calling get() returns the boxed value.

The main code is again more or less a chain of pre-defined functions.

public static JavaRDD<Tuple2<Integer,Optional<String>>> joinData(JavaPairRDD<Integer, Integer> t, JavaPairRDD<Integer, String> u){
  JavaRDD<Tuple2<Integer,Optional<String>>> leftJoinOutput = t.leftOuterJoin(u).values().distinct();
  return leftJoinOutput;
}
public static JavaPairRDD<Integer, String> modifyData(JavaRDD<Tuple2<Integer,Optional<String>>> d){
  return d.mapToPair(KEY_VALUE_PAIRER);
}
public static Map<Integer, Object> countData(JavaPairRDD<Integer, String> d){
  Map<Integer, Object> result = d.countByKey();
  return result;
}

The processData() function from the Scala version was broken into three new functions joinData(), modifyData() and countData(). We simply did this to make the code more clear – Java is verbose. All the data transformation steps could have been put into one function that would be similar to processData() from the Scala solution.

The leftOuterJoin() function joins two RDDs on key.

The values() functions allows us to omit the key of the Key Value RDD as it is not needed in the operations that follow the join. The distinct() function selects distict Tuples.

And finally countByKey() counts the number of countries where the product was sold.

Running the resulting jar

/usr/bin/spark-submit --class main.java.com.matthewrathbone.sparktest.SparkJoins --master local ./spark-example-1.0-SNAPSHOT-jar-with-dependencies.jar /path/to/transactions.txt /path/to/users.txt /path/to/output_folder

15/12/20 11:49:47 INFO DAGScheduler: Job 2 finished: countByKey at SparkJoins.java:74, took 0.171325 s
CountByKey function Output: {1=3, 2=1}

$ hadoop fs -ls sparkout
Found 9 items
-rw-r--r--   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/_SUCCESS
-rw-r--r--   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/part-00000
-rw-r--r--   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/part-00001
-rw-r--r--   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/part-00002
-rw-r--r--   1 hadoop hadoop          4 2015-12-20 11:49 sparkout/part-00003
-rw-r--r--   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/part-00004
-rw-r--r--   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/part-00005
-rw-r--r--   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/part-00006
-rw-r--r--   1 hadoop hadoop          4 2015-12-20 11:49 sparkout/part-00007
$ hadoop fs -tail sparkout/part-00003
1 3
$ hadoop fs -tail sparkout/part-00007
2 1

Testing

The idea and the set up are exactly the same for Java and Scala.

public class SparkJavaJoinsTest implements Serializable {
  private static final long serialVersionUID = 1L;
  private transient JavaSparkContext sc;

  @Before
  public void setUp() {
    sc = new JavaSparkContext("local", "SparkJoinsTest");
  }

  @After
  public void tearDown() {
    if (sc != null){
      sc.stop();
    }
  }

  @Test
  public void testExampleJob() {

    ExampleJob job = new ExampleJob(sc);
    JavaPairRDD<String, String> result = job.run("./transactions.txt", "./users.txt");

    Assert.assertEquals("1", result.collect().get(0)._1);
    Assert.assertEquals("3", result.collect().get(0)._2);
    Assert.assertEquals("2", result.collect().get(1)._1);
    Assert.assertEquals("1", result.collect().get(1)._2);

  }
}

The test is more or less self-explanatory. As usually we check the content of the output to validate it’s operation.

Thoughts

Java is a lot more verbose than Scala, although this is not a Spark-specific criticism.

The Scala and Java Spark APIs have a very similar set of functions. Looking beyond the heaviness of the Java code reveals calling methods in the same order and following the same logical thinking, albeit with more code.

All things considered, if I were using Spark, I’d use Scala. The functional aspects of Spark are designed to feel native to Scala developers, which means it feels a little alien when working in Java (eg Optional). That said, if Java is the only option (or you really don’t want to learn Scala), Spark certainly presents a capable API to work with.

Spark Resources

The Spark official site and Spark GitHub have resources related to Spark.

Further Reading

Learning Spark: Lightning-Fast Big Data Analysis by Holden Karau, Andy Konwinski, Patrick Wendell, Matei Zaharia.

Matthew Rathbone's Picture

Matthew Rathbone

Consultant Big Data Infrastructure Engineer at Rathbone Labs. British. Data Nerd. Lucky husband and father.

Hire me to supercharge your Hadoop and Spark projects

I help businesses improve their return on investment from big data projects. I do everything from software architecture to staff training. Learn More

Join the discussion