Hadoop :Custom Input Format


Hadoop is popular open source distributed computing framework. The data to be processed on top of Hadoop is usually stored on Distributed File System. e.g. HDFS (Hadoop Distributed File System).

To read the data to be processed, Hadoop comes up with InputFormat, which has following responsibilities:

  • Compute the input splits of data
  • Provide a logic to read the input split

From implementation point of view:

InputFormat defines how to read the input data. Its responsibilities are :

  1. Compute the input splits of data :
    InputSplit represents the part of data to be processed by Mapper instance .Each Mapper will get unique input split to process.
    Before MR job starts, InputFormat splits the data into multiple parts based on their logical boundaries and HDFS block size.
    Following method computes input splits.

    abstract List getSplits(JobContext context)
  2. Provide a logic to read the input split :
    Each Mapper gets unique input split to process. Input format provides a logic to read the split, which is an implementation of RecordReader. The record reader will read input split and emit <Key,Value> as input for each map function call.
    Following method creates record reader for given split.

    abstract RecordReader <K,V> createRecordReader(InputSplit is,
    TaskAttemptContext context) 

Example : Writing custom input format to read email dataset

Lets write the custom input format to read email data set. Usually emails are stored under the user-directory in sub-folders like inbox, outbox, spam, sent etc.Email header contains sender,receivers, subject, date, message-ID and other metadata fields. We can parse the email header using Java APIs.

In following example we will read each email file and parse sender and receivers. Input key for map function will be email participants (sender and receiver) and input value will be NullWritable.

Following class contains email participants(sender and receivers) and will be input key for map function.

// it contains email participants (sender and receivers)
public class EmailParticipants implements WritableComparable {

	private Set receivers = null;
	private String sender;
	private int receiverCnt;

	public EmailParticipants(String sender, Set receivers) {
		this.sender = sender;
		this.receivers = receivers;
		receiverCnt = receivers.size();

	public String getSender() {
		return sender;

	public void setSender(String from) {
		this.sender = from;

	public Set getReceiver() {
		return receivers;

	public void setReceivers(Set receivers) {
		this.receivers = receivers;

	public EmailParticipants() {

	public void readFields(DataInput dataIp) throws IOException {
		sender = dataIp.readUTF();
		receiverCnt = dataIp.readInt();
		receivers = new HashSet(receiverCnt);
		for (int i = 0; i &lt; receiverCnt; i++) {

	public void write(DataOutput dataOp) throws IOException {
		Iterator rcvr = receivers.iterator();
		while (rcvr.hasNext()) {

	public int compareTo(EmailParticipants arg0) {
		return sender.compareTo(arg0.getSender());

Following implementation of input format will recursively read each file present under the input data directory. Each input split will contain a single unique email file. So total number of splits will be total number of emails.

// Input format for reading email dataset
public class EmailInputFormat extends FileInputFormat<EmailParticipants, NullWritable> {

  public RecordReader<EmailParticipants, NullWritable>
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
   return new EmailRecordReader();

	protected List<FileStatus> listStatus(JobContext job) throws IOException {
		return MapRedUtil.getAllFileRecursively(super.listStatus(job),

  protected boolean isSplitable(JobContext context, Path file) {
    CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    return codec == null;

Record reader will read entire split (email file) in nextKeyValue()in one go and emit <Key,Value> as <EmailParticipants, NullWritable> as input arguments to map function.

// Record reader to parse email participants
public class EmailRecordReader extends
		RecordReader<EmailParticipants, NullWritable> {
	private boolean toggle = false;

	private NullWritable value = NullWritable.get();
	private Path file = null;
	private Configuration jc;
	private EmailParticipants emailData = null;

	public EmailRecordReader() {

	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException {
		FileSplit split = (FileSplit) genericSplit;
		file = split.getPath();
		jc = context.getConfiguration();

      public boolean nextKeyValue() throws IOException {
        InputStream mailFileInputStream=null;
        try {
            mailFileInputStream= FileSystem.get(jc).open(file);
            Properties props = new Properties();
            Session session = Session.getDefaultInstance(props, null);
            MimeMessage message = new MimeMessage(session, mailFileInputStream);
            MimeMessageParser mimeParser = new MimeMessageParser(message);
            Set<String> receivers = new HashSet<String>();
            populateMailParticipients(mimeParser.getTo(), receivers);
            populateMailParticipients(mimeParser.getCc(), receivers);
            populateMailParticipients(mimeParser.getBcc(), receivers);
            String sender = mimeParser.getFrom().replaceAll("\\.+", ".");
            emailData = new EmailParticipants(sender, receivers);
          } catch (MessagingException e) {
            throw new IOException(e);
          } catch (Exception e) {
            throw new IOException(e);
        	if (mailFileInputStream != null) {
          toggle = !toggle;
          return toggle;

	private void populateMailParticipients(List<Address> participants,
			Set<String> list) {
		for (Address addr : participants) {
			String str = addr.toString().replaceAll("\\.+", ".");

	public EmailParticipants getCurrentKey() throws IOException, InterruptedException {
		return emailData;

	public NullWritable getCurrentValue() {
		return value;

	public float getProgress() throws IOException, InterruptedException {
		return 0; // TODO

A Quick Test:

To test custom input format class we have to configure Hadoop Job as:

Job job = new Job(conf, &quot;custom-input-format-test&quot;);