MMM More Bacon – Pig User-Defined Functions (UDFs)

pig_kittenOkay…okay…I know…the pig jokes are lame and getting old by now…maybe a picture of a kitten dressed like a Pig will cheer you up. Luckily this is the last of my introductory Pig posts before moving on to MapReduce. In this post we are going to spend some time creating and playing around with Pig User-Defined Functions (UDFs). We will look at what they are, how they are developed and ultimately leveraged as operators within you Pig Latin scripts. So without further ado…..

What is a Pig UDF?

The Pig UDF is an extensibility feature which allows you to write custom code to perform custom data processing in one of four areas:

  • Eval – The most common type of UDF. Typically used to alter a field such as forcing a string to upper-case or as we saw in my prior post, parsing a string using substring.
  • Filter – Another common UDF type, this function always returns a true/false value. An example of this type of function could be a function that filters weather data by precipitation or snowfall.
  • Group/Aggregate – An extension of the Eval function, this function type allows you build aggregation operation on sets. Example of this type include the built-in COUNT, MIN, MAX and AVERAGE.
  • Load/Store – A function type that controls how data is either loaded into or stored out of Pig. This function is built on top of the InputFormat & OutputFormat classes within Hadoop which abstracts away some of the complexity for reading and writing data.

These four areas make UDFs particularly powerful since not only do they extend what’s possible they can be used with almost any operator available within Pig. In fact UDFs are very common in Pig scripts. We have already used the SUBSTRING and AVG functions, which are both UDFs. Some UDFs are built-in and other such as the SUBSTRING are available in libraries that are community, open-source based. The two you most want to be familiar with are the Piggybank (Part of the Pig Contrib – LINK and HERE) library and LinkedIn’s DataFu (LINK) library.

While a lot can be written about those libraries, this post is about building your own UDF, so the next logical question is, “How do I build my own chunk of awesomeness?”. Well you have to write some code for that to happen. UDFs can be written in Java, JavaScript or Python. The most extensive support however, exists within the Java realm. I can feel all my Microsoft.Net friends squirming already. Don’t fret though, you will see shortly that UDFs are actually very approachable, especially if you have a background in or can read and understand C#.

Building your first UDF: Eval Function

WARNING: this is no “Hello World!” demo. To play along I recommend that you find and use a Java IDE. I am still scarred by my time spent using WebSphere development studio so I have chose to use NetBeans (https://netbeans.org/), which is free and should be somewhat familiar if you are comfortable in Visual Studio.

In this demo, we are going to write a function to simplify the Pig scripts we wrote in my prior blog (HERE). The first function we will write will take a row of input data (in the form on a tuple, with one field) from the weather stations data file. The function will be responsible from trimming out any headers and then parsing the row into individual fields. The fields will be returned as a single tuple. For this first example we will ignore the data schema in the UDF and instead project the schema onto the data within the Pig script.

  1. Start by creating a new Java class library project. Once the project is created, you want to create a new package within the project (Do not use the Default package). For this demo, I am using com.pragmaticworks.pig.udfs for both my project and package name. You will also need to add the pig-0.9.3-SNAPSHOT.jar to the libraries for your project.
    image
  2. Create a new class file named WeatherStationParser.
  3. Next we need to add the required imports so that we have access too all the classes and interfaces needed. See the code listing below for the four import statements required.
    (If you find yourself needing to resolve references and are familiar with Visual Studio’s function for resolving the using or import statements, NetBeans has a similar function on the context menu called “Fix Imports”.)
  4. The WeatherStationParser class needs to extend or inherit from the EvalFunc<T> abstract class. This class has a single abstract method called exec(Tuple) which you must provide an implementation for. The implementation of this method is what determines the behavior that your UDF will have. In this example, we handle the parsing of the row into its distinct fields. The full code for the class is provided below with a detailed explanation included in the code comments.
    package com.pragmaticworks.pig.udfs;
    
    import java.io.IOException; 
    import org.apache.pig.EvalFunc;
    import org.apache.pig.data.Tuple;
    import org.apache.pig.data.TupleFactory;
    
    public class WeatherStationParser extends EvalFunc {
        TupleFactory tupleFactory = TupleFactory.getInstance();
    
        /*
         * The exec function takes a tuple as an argument. Remeber that
         * a tuple is simply a collection fields that are primitive types.
         */
        @Override
        public Tuple exec(Tuple tuple) throws IOException {
            //Expect the unexpected and then handle it gracefully
            if (tuple == null || tuple.size() != 1)
                return null;
    
            try{
                //Get the data row
                String row = (String)tuple.get(0);
    
                /*
                 * Kludgy...but effective filter header rows
                 * by length. Should do regex pattern matching
                 */
                if (row.length() > 95)
                {
                    //Use the tuple factory to create a new tuple                                
                    Tuple t = tupleFactory.newTuple();
    
                    t.append(GetIntFromString(row, 0, 6)); //USAF_ID
                    t.append(GetIntFromString(row, 7, 13)); //WBAN_ID
                    t.append(ParseString(row, 13, 43)); //STATION_ID       
                    t.append(GetLatitude(GetFloatFromString(row, 58, 64))); //LAT
                    t.append(GetLongitude(GetFloatFromString(row, 65, 73))); //LOT
    
                    //Return the tuple
                    return t;
                } else {
                    /*
                     * For header rows, we will simply return NULL
                     * we will filter the NULLs out in the query
                     */                
                    return null;
                }
            } catch (Exception ex)
            {
                /*
                 * Uh-oh something went wrong....
                 * Return NULL...consider logging this error
                 * if its important
                 */
                return null;
            }
        }
    
        /*
         * Helper Methods
         */
        private Float GetLatitude(float val)
        {
            //Filter out NULL Latitudes
            if (val == 0f || Math.abs(val) == 99999f)
                return null;
    
            //Latitudes are in decimal degree format
            return val/1000f;
        }
    
        private Float GetLongitude(float val)
        {
            //Filter out NULL Longitudes
            if (val == 0f || Math.abs(val) == 999999f)
                return null;
    
            //Longitudes are in decimal degree format
            return val/1000f;
        }
    
        private String ParseString(String s, int start, int stop)
        {
            //Parse the string 
            return s.substring(start, stop).trim();
        }
    
        private int GetIntFromString(String s, int start, int stop)
        {
            /*
             * Parse the string and remove the '+' plus sign 
             * before we convert it to an integer
             */        
            String substring = s
                    .substring(start, stop)
                    .replace("+", "")
                    .trim();
            return Integer.parseInt(substring);
        }
    
        private float GetFloatFromString(String s, int start, int stop)
        {
            /*
             * Parse the string and remove the '+' plus sign 
             * before we convert it to an float
             */
            String substring = s
                    .substring(start, stop)
                    .replace("+", "")
                    .trim();
            return Float.parseFloat(substring);
        }
    }
  5. With your class complete, you are ready to build it and package it into a jar file. If you are using NetBeans, your compiled class and the jar file will be in the dist folder, which is found where ever your created your project (The default project location is C:\Users\{YOUR USER}\Documents\NetBeansProjects\). You can reference the jar from this location, move the jar to a folder in the Pig class path, or even register a new class path.
  6. Now we can readdress our Pig script from the prior post and write it using our new UDF. First, you will need to register the jar file and then define the function. When you define the function you will need to use the fully-qualified path including the namespace. You will also need to project a schema onto the data using an alias. We can rewrite the script as follows:
    register 'C:\Hadoop\libraries\com.pragmaticworks.pig.udfs.jar'
    define StationParser com.pragmaticworks.pig.udfs.WeatherStationParser();
    
    ish = load '/user/Administrator/WeatherData/Stations.txt' as (row:chararray);
    stations = foreach ish generate flatten(StationParser(row)) as 
    	(USAF_ID: int, WBAN_ID: int, STATION_ID: chararray,
    	LATITUDE: float, LONGITUDE: float);
    stationsFiltered = filter stations by USAF_ID is not NULL;

Extending the UDF: Creating a schema

In the prior UDF, we implemented only the exec(Tuple) function to parse out the station data. The result of this requires that we define data schema within the script. While this provides an added measure of flexibility it also requires that we do more in the Pig Script (and if your going through the trouble to build a UDF why not include the schema.

We will create a new UDF to parse the weather data. To prevent this post from turning into a novel you can download the completed class HERE. In this function we are adding a schema to the function. This requires that we to override the outputSchema(Schema) method. The example of this method as it is used in the WeatherDataParser (download it HERE) class is hopefully straight-forward:

@Override    
    public Schema outputSchema(Schema input){
        try {
            Schema tupleSchema = new Schema();

            tupleSchema.add(new Schema.FieldSchema("STN", DataType.INTEGER));
            tupleSchema.add(new Schema.FieldSchema("WBAN", DataType.INTEGER));
            tupleSchema.add(new Schema.FieldSchema("YEAR", DataType.INTEGER));
            tupleSchema.add(new Schema.FieldSchema("MAX_TEMP", DataType.FLOAT));

            return tupleSchema;
        } catch (Exception e)
        {
            //Safely return nothing
            return null;
        }
    }

Notice that we define the structure of the tuple including a field/column name and the primitive data type. Note that in this demo we worked with exclusively with Tuples but we can work with any of the supported data structures (Bag, Tuple or Map). Now when we do a DESCRIBE on the function the schema is provided for us.

image

Wrapping-Up

With our two new UDF’s we can now clean-up the script from the previous post. The new script it cleaner and certainly easier to work with. In fact we’ve expanded the scope from the prior post to include the latitude and longitude for each station so that those who are so inclined can use Sqoop to pull the data into PowerView or GeoFlow to map. The new pig script can be downloaded HERE.

This post only scratches the surface of what’s possible with UDFs. We could have written a data loader to perform a similar task, implemented more sophisticated calculations on the weather data or even built a filter to identify days and locations when fog, precipitation or snow is present. I hope these last few post helps you get started with Pig and furthers your interest in HDInsight.

Till next time!

Chris

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s