MapReduce – First Glance

In my last post, we took a helicopter tour of the MapReduce framework and its many facets. I believe its important to have a functional understanding of MapReduce even if you never intend to never work directly with it since the more user-friendly abstractions of both Pig and Hive depend on it.

In this post we will again turn to Java as we let our fingers do the walking to build our first MapReduce program. For this demo we will start slowly, implementing first the map and reduce functions before building the job. In subsequent posts, we will build on this demo to implement custom types before turning to more advanced concepts such as status reporting, configuration, partitioning and finally using the Shuffle/Sort phase to implement a secondary sort.

Know before you go

As a brief review, the MapReduce framework is built entirely on the notation of key/value pairs. Using the default input, a key/value pair is sent to the map function that consist of a key that represents a position of the data in the file and a value that contains the line of text from the beginning of the line to the line terminator.

The map function is responsible for splitting the line into one or more new key/value pairs which are then sent on to the Reducer by way of the Shuffle/Sort, which will process the data and emit a summarized or aggregated key/value pair. Obviously, this is a drastic simplification of a very complex and powerful framework, but it gives up an easy starting point.

The Demo

The demo we are going to build will mimic the functionality of the Pig demo built in my previous post (HERE). We will process the NOAA GSOD weather station data to find the maximum weather temperature for each station. The map function will be responsible for reading and parsing the file. It will enumerate key/value pairs that represent a temperature on a given day for each station. The reduce function will take all the temperature readings for a single station, find the maximum and then spit out a key/value pair for each station with its maximum temperature.

To get started using NetBeans or your favorite Java IDE, create a new project called MapReduceSamples. For this demo, I am using a package called com.pragmaticworks.mapreduce.samples. Two classes will be used to implement this MapReduce job:

  • GSODTemperatureCalculator –  contains the core The map and reduce function implement as well as a runnable main function to handle configuration and the job start
  • GSODRecordParser – abstracts away the logic required to parse the weather data from the GSOD data file. The specifics of this function are not discussed in this blog and you can download the complete class HERE.

The Mapper

The map function is an individual distributed task (pushed to the data) that is responsible for transform the input record into an intermediate record. To better visualize how this plays out, let’s look at how the input for the GSOD weather data appears to the map function:

<123, 007034 99999  20121104   ….<< OMITTED >>……..    5.1  999.9    91.4*   78.8*  0.00I 999.9  000000>

The key, value sent to the map function consist of a byte location of the record within the file and the record. The record is determined by the InputFormat that is used. We are using the TextInputFormat which will read the data file line by line.

Once the map function has a record of data, it will parse the input value to extract the station id, year and maximum temperature. The station id and date info will be used to build the key for the intermediate input while the maximum temperature will be used as the intermediate value. The result looks something like:

<2012-007034-99999, 91.4>

Note that we are using Text and FloatWritable types for our intermediate output. The definition of these types are made both in the generic interface implementation and in the job configuration (see below).  Also of note is the GSODRecordParser that does all the heavy lifting in terms of parsing the input record. The full class implementation for the mapper is:

public static class Map extends MapReduceBase 
        implements Mapper { 

        private GSODRecordParser parser = new GSODRecordParser();

        @Override
        public void map(LongWritable key, Text value, 
            OutputCollector output, Reporter reporter) 
                throws IOException {

            parser.parse(value);

            if (parser.isValidTemperature())
            {
                output.collect(
                        new Text(parser.getKey()), 
                        new FloatWritable(parser.getMaxTemperature()));
            }
        }
    }

The Reducer

The reduce function runs after the Shuffle/Sort phase and is responsible for processing all the intermediate key/value pairs. Prior to the Reduce running, the intermediate keys are all gathered together so that all intermediate items are sent to a single reducer. Since mappers and reducers run separately, data is typically moved across the network from the local map store, through the shuffle/sort and to the reducer. To better understand how the reducer functions let’s walk through an example.

Like the map function, the reduce receives also receives key, value pair. Instead of one value however, the Shuffle/Sort has packaged all values for a given key into a collection or an array. This ensures that each key’s data is sent to only one reducer. The input then comes to the reduce in the form of Key/Iterator<Value> and looks like the following:

<2012-007034-99999,  {77.4, 91.4, 86.2, 100.5, 93.7}>

The reduce function will take this input and in our case iterate the values to find the maximum temperature (i.e. 100.5). The key and the maximum temperature will be returned as the final results. The full class implementation is:

public static class Reduce extends MapReduceBase 
            implements Reducer { 
        @Override
        public void reduce(Text key, Iterator values, 
            OutputCollector output, Reporter reporter) 
                throws IOException { 
            float maxValue = Float.MIN_VALUE;

            while (values.hasNext()) {
                maxValue = Math.max(maxValue, values.next().get());
            }

            output.collect(key, new FloatWritable(maxValue));
        }
    }

The results from the reduce function are written/stored based on the FileOutputFormat that is configured. The default again is the TextOutputFile which will write the key/maximum temperature pairs out to a file. Also note, that one output file is written per reducer job regardless of home many input files exists so one or more output files may exists.

Before, wrapping up reducers let’s take a moment to discuss combiners. Combiners function almost exactly as a reducer, except that it’s performed at the map and works on data in memory. The output of the combiner is not guaranteed to be distinct per key like the reduce but it only intended to aggregate the data in some way. In our example, where we are finding the maximum temperature for each station, it’s possible to simply reuse our reducer class as a combiner. The concept behind this principle is that we want to limit the amount of data that needs to be moved through the shuffle/sort process to the reduce function by finding the maximum temperature of sets of data at each mapper.

The Job

In order to submit our MapReduce program to the Hadoop Framework it must be submitted in the form of a job. We will need to describe the job using the job configuration so that Hadoop knows things such as the output data types, which Input/Output formats to use, the classes that should be used for mapping and reducing as well as the more obvious things like job name and input/output file paths. For this demo the full implementation is below:

public static void main(String[] args) throws Exception {   
        JobConf conf = new JobConf(GSODTemperatureCalculator.class);   
        conf.setJobName("TemperatureCalculator");  

        conf.setOutputKeyClass(Text.class);  
        conf.setOutputValueClass(FloatWritable.class);  

        conf.setMapperClass(Map.class);  
        conf.setCombinerClass(Reduce.class);  
        conf.setReducerClass(Reduce.class);  

        conf.setInputFormat(TextInputFormat.class);  
        conf.setOutputFormat(TextOutputFormat.class);  

        FileInputFormat.setInputPaths(conf, new Path(args[1]));  
        FileOutputFormat.setOutputPath(conf, new Path(args[2]));  

        JobClient.runJob(conf);         
    }

Wrap-Up

With the code complete (download the complete solution HERE), we must compile the classes and then package them into a *.jar file. The jar file will be submitted to hadoop using the following command:

hadoop jar mapreducesamples.jar com.pragmaticworks.mapreduce.samples.GSODTemperatureCalculator <input> <output>

The only arguments required are the hdfs path to the input file(s) and the hdfs output path. This demo is dependent on having the GSOD data available in HDFS, you can read my post on preparing the data HERE. When we execute the job we see both the map and reduce run and our output.

image

image

image

In my next blog post, we will expand this example to incorporate additional configuration, job reporting and working with distributed cache.

Till Next Time!

Chris

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s