Shakin’ Bacon: Using Pig To Process Data

In my last post (see HERE), I introduce the Apache Pig project and showed you the equivalent of the “Hello World” demo in Pig. In this post, we are going to use the GSOD (Global Summary of the Day) station weather reports to calculate the average maximum daily temperature for each station. If you have not loaded the data, please see my previous post on Preparing and Loading data.

Notes & Considerations

  1. You will need to set-up and use the PiggyBank UDFs (User Defined Functions) library. For help setting-up and installing, see my notes HERE.
  2. The GSOD and ISH-History (Weather Stations) data files are both fixed-width files and the required columns will need to be parsed out of the row and converted to the appropriate data type.
  3. Both files have header rows that need to be filtered out.
  4. The files will need to be joined to produce the final result set.
  5. My HDFS working path is /user/Administrator/WeatherData
  6. In my previous blog, the PigLatin was entered directly to the Pig Grunt Shell. This demo will use a Pig Script.

Getting Started

Querying Weather Stations

  1. Create a new text file in either Notepad or your favorite text editing tool and name the file WeatherDataAverageMaxTemperature.pig.
  2. Since this solution uses UDFs (we’ll cover those in more detail in the next post), we first need to register the PiggyBank jar file and then define the functions our solution requires: SUBSTRING and REPLACE.
    register 'C:\Hadoop\pig-0.9.3-SNAPSHOT\contrib\piggybank\java\piggybank.jar';
    define SUBSTRING org.apache.pig.piggybank.evaluation.string.SUBSTRING();
    define REPLACE org.apache.pig.piggybank.evaluation.string.REPLACE();
  3. Since both data file we are working with are fixed-width, the initial load for both will simply bring in the entire row as a single string field.
    ish = load '/user/Administrator/WeatherData/Stations.txt' as (row:chararray);
  4. The ISH data file has header rows which must be filtered out. To filter all the header rows out, we will use the SIZE function to remove rows that are less than 95 characters.
    ishHeaderless = filter ish by SIZE(row) > 95L;
  5. The next step is to project and transform the data. We will parse out the USAF, WBAN and STATION ID columns. Clean-up the columns to trim any whitespace characters and then convert the USAF and WBAN columns to integers.
    stations = foreach ishHeaderless generate 
    	(int)TRIM(SUBSTRING(row, 0, 6)) AS USAF_ID, 
    	(int)TRIM(SUBSTRING(row, 7, 13)) AS WBAN_ID,
  6. The weather station data if now ready to be joined to the GSOD weather data.

Querying Weather Data

Like the previous step in which we loaded the station info, the weather data has headers which must be stripped. We also must parse each row using the substring function to extract the data elements we need.

gsod = load '/user/Administrator/WeatherData/2012/xaa' as (row: chararray); 
gsodHeaderless = filter gsod by SIZE(row) > 108L;
weatherData = foreach gsodHeaderless generate 
	(int)TRIM(SUBSTRING(row, 0, 6)) AS STN,
	(int)TRIM(SUBSTRING(row, 7, 13)) AS WBAN, 
	(int)TRIM(SUBSTRING(row, 14, 18)) AS YEAR, 
	(float)TRIM(SUBSTRING(row, 102, 108)) AS MAX_TEMP;

With the base set, we can refine the query to meet our needs.

  1. The GSOD data files use 9999.9 in the maximum temperature field to indicate no data for any given day. We need to filter these entries out so that they do not skew the overall averages.
    weatherDataFilter = filter weatherData by not MAX_TEMP == 9999.9F;
  2. Next we use the group operate to project a group using the STN, WBAN and YEAR.
    weatherDataGrouped = group weatherDataFilter by STN..YEAR;
  3. Now, project and flatten the group and calculate the average for the maximum temperature.
    weatherDataAveraged = foreach weatherDataGrouped {
    	MAX = weatherDataFilter.MAX_TEMP;
    	generate flatten(group), AVG(MAX) as AVERAGE_MAX_TEMP;

Joining Stations with their Data

The final steps require that we join the two data sets. To perform the join we will use the cogroup operator to bring the relations together. The cogroup operator is similar to the join and potentially performs slightly better because of the way data is isolated. The results of the data are projects and flattened using the foreach before being written to HDFS.

joinedData = cogroup weatherDataAveraged by (STN, WBAN), stations by (USAF_ID, WBAN_ID) inner;
results = foreach joinedData generate
store results into '/user/Administrator/Output/WeatherData' using PigStorage('\t');

Finishing Up

After you save your pig script, you are ready to execute the script and view the results. If you would like to download the complete script, you may do so HERE. Use the Hadoop command prompt to kick off the Pig script:

pig WeatherDataAverageMaxTemperature.pig

When the script executes, the Pig Latin is translated and optimized into multiple Map/Reduce jobs. The Map/Reduce jobs are then submitted to the Hadoop cluster for processing. This Pig script created two Map/Reduce jobs:

  • One job (9 map/1 reducer) used to process the weather data including calculating the average.
  • One job (2 map/1 reducer) used to process the station data, join the two data sets and generate the output.
image image


Pig Jar File:

Wrap Up

In this post, we went beyond the typical “Hello World” Pig demo to show that Pig is a powerful platform for both data processing and exploration. Even still we are only scratching the surface of the capabilities of this technology. In the next post we are going to look at building and using UDFs to extend this example.

Want more? This demo could easily be extended to extract the latitude and longitude for each station which could then be plotted for a cool visual.

Till next time!



3 thoughts on “Shakin’ Bacon: Using Pig To Process Data

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s