Hadoop :Custom Input Format

Preface:

Hadoop is popular open source distributed computing framework. The data to be processed on top of Hadoop is usually stored on Distributed File System. e.g. HDFS (Hadoop Distributed File System).

To read the data to be processed, Hadoop comes up with InputFormat, which has following responsibilities:

  • Compute the input splits of data
  • Provide a logic to read the input split

From implementation point of view:

InputFormat defines how to read the input data. Its responsibilities are :

  1. Compute the input splits of data :
    InputSplit represents the part of data to be processed by Mapper instance .Each Mapper will get unique input split to process.
    Before MR job starts, InputFormat splits the data into multiple parts based on their logical boundaries and HDFS block size.
    Following method computes input splits.

    abstract List getSplits(JobContext context)
  2. Provide a logic to read the input split :
    Each Mapper gets unique input split to process. Input format provides a logic to read the split, which is an implementation of RecordReader. The record reader will read input split and emit <Key,Value> as input for each map function call.
    Following method creates record reader for given split.

    abstract RecordReader <K,V> createRecordReader(InputSplit is,
    TaskAttemptContext context) 

Example : Writing custom input format to read email dataset

Lets write the custom input format to read email data set. Usually emails are stored under the user-directory in sub-folders like inbox, outbox, spam, sent etc.Email header contains sender,receivers, subject, date, message-ID and other metadata fields. We can parse the email header using Java APIs.

In following example we will read each email file and parse sender and receivers. Input key for map function will be email participants (sender and receiver) and input value will be NullWritable.

Following class contains email participants(sender and receivers) and will be input key for map function.

// it contains email participants (sender and receivers)
public class EmailParticipants implements WritableComparable {

	private Set receivers = null;
	private String sender;
	private int receiverCnt;

	public EmailParticipants(String sender, Set receivers) {
		this.sender = sender;
		this.receivers = receivers;
		receiverCnt = receivers.size();
	}

	public String getSender() {
		return sender;
	}

	public void setSender(String from) {
		this.sender = from;
	}

	public Set getReceiver() {
		return receivers;
	}

	public void setReceivers(Set receivers) {
		this.receivers = receivers;
	}

	public EmailParticipants() {
	}

	@Override
	public void readFields(DataInput dataIp) throws IOException {
		sender = dataIp.readUTF();
		receiverCnt = dataIp.readInt();
		receivers = new HashSet(receiverCnt);
		for (int i = 0; i &lt; receiverCnt; i++) {
			receivers.add(dataIp.readUTF());
		}
	}

	@Override
	public void write(DataOutput dataOp) throws IOException {
		dataOp.writeUTF(receivers.toString());
		dataOp.writeInt(receivers.size());
		Iterator rcvr = receivers.iterator();
		while (rcvr.hasNext()) {
			dataOp.writeUTF(rcvr.next());
		}
	}

	@Override
	public int compareTo(EmailParticipants arg0) {
		return sender.compareTo(arg0.getSender());
	}
}

Following implementation of input format will recursively read each file present under the input data directory. Each input split will contain a single unique email file. So total number of splits will be total number of emails.

// Input format for reading email dataset
public class EmailInputFormat extends FileInputFormat<EmailParticipants, NullWritable> {

  @Override
  public RecordReader<EmailParticipants, NullWritable>
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
   return new EmailRecordReader();
  }

   @Override
	protected List<FileStatus> listStatus(JobContext job) throws IOException {
		return MapRedUtil.getAllFileRecursively(super.listStatus(job),
				job.getConfiguration());
	}

@Override
  protected boolean isSplitable(JobContext context, Path file) {
    CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    return codec == null;
  }
}

Record reader will read entire split (email file) in nextKeyValue()in one go and emit <Key,Value> as <EmailParticipants, NullWritable> as input arguments to map function.

// Record reader to parse email participants
public class EmailRecordReader extends
		RecordReader<EmailParticipants, NullWritable> {
	private boolean toggle = false;

	private NullWritable value = NullWritable.get();
	private Path file = null;
	private Configuration jc;
	private EmailParticipants emailData = null;

	public EmailRecordReader() {
	}

	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException {
		FileSplit split = (FileSplit) genericSplit;
		file = split.getPath();
		jc = context.getConfiguration();
	}

      public boolean nextKeyValue() throws IOException {
        InputStream mailFileInputStream=null;
        try {
            mailFileInputStream= FileSystem.get(jc).open(file);
            Properties props = new Properties();
            Session session = Session.getDefaultInstance(props, null);
            MimeMessage message = new MimeMessage(session, mailFileInputStream);
            MimeMessageParser mimeParser = new MimeMessageParser(message);
            Set<String> receivers = new HashSet<String>();
            populateMailParticipients(mimeParser.getTo(), receivers);
            populateMailParticipients(mimeParser.getCc(), receivers);
            populateMailParticipients(mimeParser.getBcc(), receivers);
            String sender = mimeParser.getFrom().replaceAll("\\.+", ".");
            emailData = new EmailParticipants(sender, receivers);
          } catch (MessagingException e) {
            e.printStackTrace();
            throw new IOException(e);
          } catch (Exception e) {
            e.printStackTrace();
            throw new IOException(e);
          }finally{
        	if (mailFileInputStream != null) {
        		mailFileInputStream.close();
        	}        
          }
          toggle = !toggle;
          return toggle;
       }

	private void populateMailParticipients(List<Address> participants,
			Set<String> list) {
		for (Address addr : participants) {
			String str = addr.toString().replaceAll("\\.+", ".");
			list.add(str);
		}
	}

	@Override
	public EmailParticipants getCurrentKey() throws IOException, InterruptedException {
		return emailData;
	}

	@Override
	public NullWritable getCurrentValue() {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return 0; // TODO
	}
}

A Quick Test:

To test custom input format class we have to configure Hadoop Job as:

Job job = new Job(conf, &quot;custom-input-format-test&quot;);
job.setInputFormatClass(EmailInputFormat.class);

Advertisements

5 thoughts on “Hadoop :Custom Input Format

  1. Pingback: Apache Pig : Custom Load Function | Shrikant Bang's Notes

  2. EmailRecordReader.java has the line “private LineReader in;” but in isn’t used anywhere else in the file (other than in the close() function). Is “LineReader in” not relevant to EmailRecordReader?

    • Thank you John for notifying. Your are correct. “LineReader” is not used in src-code, rather we are opening a input stream of file and passing to Java Mail APIs. I have updated src-code.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s