Apache Pig : Writing a User Defined Function (UDF)

Preface:

In this post we will write a basic/demo custom function for Apache Pig, called as UDF (User Defined Function).
Pig’s Java UDF extends functionalities of EvalFunc. This abstract class have an abstract method “exec” which user needs to implement in concrete class with appropriate functionality.

Problem Statement:

Lets write a simple Java UDF which takes input as Tuple of two DataBag and check whether second databag(set) is subset of first databag(set).
For example, Assume you have been given tuple of two databags. Each DataBag contains elements(tuples) as number.

Input:
Databag1 : {(10),(4),(21),(9),(50)}
Databag2 : {(9),(4),(50)}
Output:
True

Then function should return true as Databag2 is subset of Databag1.

From implemetation point of view

As we are extending abstract class EvalFucn, we will be implementing exec function. In this function we’ll write logic to find is given set is subset of other or not. We will also override function outputSchema to specify output schema ( boolean : true or false ).


import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;

/**
 * Find the whether given SetB is subset of SetA.
 * <p>
 * 	input:
 * <br>setA : {(10),(4),(21),(9),(50)}</br>
 * <br>setB : {(9),(4),(50)}</br>
 * <br></br>
 *  output:
 * <br>true</br>
 * </p>
 * @author https://shrikantbang.wordpress.com
 *
 */
public class IsSubSet extends EvalFunc {

	@Override
	public Schema outputSchema(Schema input) {
		if(input.size()!=2){
			throw new IllegalArgumentException("input should contains two elements!");
		}

		List fields = input.getFields();
		for(FieldSchema f : fields){
			if(f.type != DataType.BAG){
				throw new IllegalArgumentException("input fields should be bag!");	
			}
		}
		return new Schema(new FieldSchema("isSubset",DataType.BOOLEAN));
	}

	private Set populateSet(DataBag dataBag){
		HashSet set = new HashSet();

		Iterator iter = dataBag.iterator();
		while(iter.hasNext()){
			set.add(iter.next());
		}
		return set;
	}

	@Override
	public Boolean exec(Tuple input) throws IOException {

		Set setA = populateSet((DataBag) input.get(0));
		Set setB = populateSet((DataBag) input.get(1));
		return setA.containsAll(setB) ? Boolean.TRUE : Boolean.FALSE;

	}

}

A Quick Test

Lets test our UDF to find whether given set is subset of other set or not.


-- Register jar which contains UDF.
register '/home/hadoop/udf.jar';

-- Define function for use.
define isSubset IsSubSet();

-- lets assume we have dataset as following :
 dump datset;
--({(10),(4),(21),(9),(50)},{(9),(4),(50)})
--({(50),(78),(45),(7),(4)},{(7),(45),(50)})
--({(1),(2),(3),(4),(5)},{(4),(3),(50)})

-- lets check subset function
result = foreach dataset generate $0,$1,isSubset($0,$1);

dump result;
--({(10),(4),(21),(9),(50)},{(9),(4),(50)},true)
--({(50),(78),(45),(7),(4)},{(7),(45),(50)},false)
--({(1),(2),(3),(4),(5)},{(4),(3),(50)},false)


12 thoughts on “Apache Pig : Writing a User Defined Function (UDF)

  1. Hey shrikant, i have the pig installed in my CDH , but o do not find the following jars, where will i find them :
    import org.apache.pig.data.DataBag;
    import org.apache.pig.data.DataType;
    import org.apache.pig.data.Tuple;
    import org.apache.pig.impl.logicalLayer.schema.Schema;
    import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;

    • oops i have found it. They come with the pig instalable. The pig-0.12.1.jar is the jar that you need to import to get all these packages in your program. Thanks.

  2. very nice explanation….
    thanks for sharing this articles…………….
    I have onw Q
    How to add elephant bird library in my project ?

  3. Hi Shrikant,

    Thanks for the code. I am new in apache pig and having problem in understanding following things in your code:

    1) The input parameters in outputSchema() i.e. Schema type.
    2) Why you are throwing exception when input.size()!=2? Why 2 specially?
    3) What data will come in “input” variable in outputSchema() method?
    4) What will be the overall output of the class? Will it be the return type of exec() method of return type of outputSchema() method?
    5) Please give me the dataset for this code.

    Thanks in advance Shrikant.

Leave a reply to vijay k Cancel reply