Apache Pig : Custom Load Function

Apache Pig is a platform for analyzing large data sets on top of Hadoop. To load a custom input dataset, Pig uses a loader function which loads the data from file-system.

Pig’s loader function uses specified InputFormat which will split input
data into logical split. Input format in turn uses RecordReader which will read each input split and emits <key,value> for map function as input.

From implementation point of view:
Custom loader function extends LoadFunc and provides implementation for following abstract methods :

  1. This method sets the path of input data.
            abstract void setLocation(String location,Job job)
  2. This method returns InputFormat class which will be used to split the input data.
             abstract InputFormat getInputFormat()
  3. This method prepares to read data. It takes an arguments as record reader and input split.
               abstract void prepareToRead(RecordReader rr, PigSplit split)
  4. This method returns the next tuple to be processed.
    abstract Tuple getNext()

Example : Writing custom load function to read email dataset

Lets write the custom load function to read email data set. Usually emails are stored under the user-directory in sub-folders like inbox, outbox, spam, sent etc.Email header contains sender,receivers, subject, date, message-ID and other metadata fields. We can parse the email header using Java APIs.

In following example we will read each email file and parse sender and receivers. Each emitted tuple will contains (sender,{(receiver-1),(receiver-2),…..(receiver-n)}). In case if all recipients are undisclosed then it will emit tuple with empty recipients as (sender,{}).

Lets categories this task into two parts:

Part I: Writing a custom Hadoop input format to read email data set :
Please find implementation of custom input format here.

Part II: Writing a custom pig loader function :
Following implementation of custom load function will read each directory recursively to process each email file.

 * Loader function which will load input email data.
public class EmailLoaderFunc extends LoadFunc {

	private RecordReader reader;
	private TupleFactory tupleFactory;

	public EmailLoaderFunc() {
		tupleFactory = TupleFactory.getInstance();

	public InputFormat getInputFormat() throws IOException {
		return new EmailInputFormat();

	 * Each tuple will like (sender,{(receiver1),(receiver2).....}).
	 * (non-Javadoc)
	 * @see org.apache.pig.LoadFunc#getNext()
	public Tuple getNext() throws IOException {
		Tuple tuple = null;
		try {
			boolean notDone = reader.nextKeyValue();
			if (!notDone) {
				return null;
			EmailParticipants edata = (EmailParticipants) reader.getCurrentKey();

			tuple.set(0, edata.getSender());
			DataBag dataBag =  BagFactory.getInstance().newDefaultBag();
			Set&lt;String&gt; rcvrList = edata.getReceiver();
				for(String s:rcvrList){
						Tuple t =tupleFactory.newTuple(1);
				tuple.set(1, dataBag);
		} catch (InterruptedException e) {
		return tuple;

	public void prepareToRead(RecordReader reader, PigSplit pigSplit)
			throws IOException {
		this.reader = reader;

	public void setLocation(String location, Job job) throws IOException {
		FileInputFormat.setInputPaths(job, location);


A Quick Test:

To quick test custom loader function follow as:

  1. create a jar which contains load function.
  2. register the loader function.
  3. load the input data using custom loader function.

A quick example of apache pig script as :

register '/home/hadoop/Desktop/emailcustomloader.jar';

input_data = load '/home/hadoop/data/email-dataset/'
using EmailLoaderFunc() as