Size Tiered Compaction Strategy In Apache Cassandra

Preface:

Compaction techniques combines multiple SSTables to a single SSTable  to improve the performance of read operations by reducing disk I/O and to free space from deleted data.
In this article we will discuss about the size tiered compaction strategy.

Size Tiered Compaction Strategy:

Size tiered compaction strategy combine multiple SSTables which belong to a particular data size range.
This technique categories SSTables into different buckets and compaction process will run for each bucket indiviaully.

    Compaction is based on following properties:

  • bucket_high (default 1.5)
    SSTables added to a bucket if the SSTable size is less than 1.5 times average size of that bucket.
    e.g. if SSTable size is 14MB and bucket avg. size is 10MB, then SSTable will be added that bucket and new avg. size will be computed the bucket.
  • bucket_low (default 0.5) SSTables added to a bucket if the SSTable size is greater than 0.5 times average size of that bucket.
    e.g. if SSTable size is 7MB and bucket avg. size is 10MB, then SSTable will be added that bucket and new avg. size will be computed the bucket.
  • max_threshold (default 32) maximum number of SSTables to allow in a bucket for compaction.
  • min_threshold (default 4) minimum number of SSTables in a bucket to start a compaction.
  • min_sstable_size (default 50MB) The bucketing process groups SSTables that differ in size by greater/less than 50%. This results in a bucketing process that is too fine grained for small SSTables. All sstables are not matching ths criteria and having size less than min_sstable_size get into a single bucket whose avg. size is less than min_sstables_size.
  • Lets assume we have SSTables of sizes (in MB) as 78, 51, 100, 60, 19, 27, 34, 7, 1, 10.

    SSTable before compaction starts

    SSTable before compaction starts

    Let’s see how compaction algorithm works …

    Algorithm:

    • Sort SSTable according to size in descending order. After sorting above SStable we will have a list of sorted SSTables(in MB) as 100, 78, 60, 51, 34, 27, 19, 10, 7, 1.

      SSTable sorted before appearing for compaction

      SSTable sorted before appearing for compaction

    • Categories SSTables into a multiple buckets based on configuration and following condition :
      IF ((bucket avg size * bucket_low < SStable’ size < bucket avg size * bucket_high) OR (SStable’ size < min_sstable_size AND bucket avg size < min_sstable_size))
      then add the SSTable to the bucket and compute the new avg. size of the bucket.

      ELSE create a new bucket with the SSTable.

      Following bucketing will happen with bucket_low as 0.5, bucket_high as 1.5, min_sstable_size as 32MB, min_threshold as 4 and max_threshold as 32.

      Bucketing before compaction

      Bucketing before compaction

    • Buckets are sorted according to their hotness(CASSANDRA-5515) property. Hotness of bucket is sum of hotness of SSTables in bucket. Cold bucket will get the less priority than hot buckets for compaction.
    • Those buckets do not meet the criteria of min_threshold will be discarded. Those buckets have SSTables more than max_threshold, will be trimmed to max_threshold and appear for the compaction. All SSTables in such bucket will be sorted according to hotness of SSTable and top (32/ max_threshold ) SStable will be considered.

    Limitation:

    • Size tiered compaction doesnt give any guarantee about column distribution of particular row. I may possible that columns of particular row-key belong to different SSTables. In such cases read performance will hit as read operation need to touch all SSTables where columns present.
    • In worst case it might need exact amount of free space(100%) to combine SSTables.
    Advertisements

Apache Pig : Group By, Nested Foreach, Join Example

Preface:

In this post we will learn about basic Apache Pig Operations like group by, foreach, nested foreach and join by examples.

Input Data:

For experimental purpose we have generated dummy test data.
We have following dataset for operations.

  1. Customer’s order history data :

    This data contains the orders placed by customer. For example customer with id ‘A’ had ordered item ‘I’. Order date in milliseconds was ‘1391230800000’ and Delivery date in milliseconds was ‘1391317200000’.
    Schema : ( customerId:int, itemId:int, orderDate:long, deliveryDate:long)
    You can generate the dummy data using following script:

  2. Customer’s basic information data :

    This data contains the basic information of customer. For example customer Id, name and city.
    Schema : ( custoerId:int, name:chararray, city:chararry)
    You can generate the dummy test data using following script:

Examples:

  1. GROUP BY:
    Lets start with ‘group by’ operation.
    Problem Statement:
    Find the number of items bought by each customer.
    Input:
    Customer’s order history data.
    Output:
    Output should contain the total number of items bought by each customer.
    Schema of output should be : (customerId:int , itmCnt:int );

    
    -- Problem Stmt : find the number of items bought by each customer.
    -- load the input data :: Schema ( customerId , itemId , order Date, delivery Date );
    orders = load '/testData100k' using PigStorage(',') as (cstrId:int, itmId:int, orderDate: long, deliveryDate: long );
    -- group input by customer id
    grpd = group orders by cstrId;
    -- count number of items bought by each customer
    items_by_customer = foreach grpd generate group as cstrId, COUNT(orders) as itemCnt;
    describe items_by_customer;
    --items_by_customer: {cstrId: int,itemCnt: long}
    
  2. NESTED FOREACH:
    Now lets discuss about Nested Foreach Operation. We can use foreach operator to iterate over input records, but we can also apply different relational operations to each record in same data processing pipeline.

    Problem Statement:
    Find the total number of items bought by each customer and which item he/she bought higest times. For example Customer A has bought items ‘i1’ 10 times , ‘i2’ 5 times and ‘i3’ 7 times. So total number of items taken by Customer A is 10+5+7=22, out of which item ‘i1’ is taken higest times.
    Input:
    Customer’s order history data.
    Output:
    Output should contain total number of items bought by customer and which of item he/she bought higest time.
    Schema of output should be :
    (customerId:int, itemId:int , higestBoughtItemCnt:long, totalItemCnt:long)

    
    -- Problem Stmt : find the number of items bought by each customer
    -- which item he/she bought highest time.
    -- load the input data :: Schema ( customerId , itemId , order Date, delivery Date );
    orders = load '/testData100k' using PigStorage(',') as (cstrId:int, itmId:int, orderDate: long, deliveryDate: long );
    -- group by  custorer-id and item-id
    grpd_cstr_itm = group orders by (cstrId,itmId);
    grpd_cstr_itm_cnt = foreach grpd_cstr_itm generate group.cstrId as cstrId, group.itmId as itmId, COUNT(orders) as itmCnt;
    -- group by cstrId 
    grpd_cstr = group grpd_cstr_itm_cnt by cstrId ;
    describe grpd_cstr;
    -- grpd_cstr: {group: int,grpd_cstr_itm_cnt: {cstrId: int,itmId: int,itmCnt: long}}
    -- iterate over grpd_cstr_itm and find total number of items bought by customer and which item he/or she bought higest time.
    result = foreach grpd_cstr{
    	total_orders = SUM(grpd_cstr_itm_cnt.itmCnt);
    	srtd_orders = order grpd_cstr_itm_cnt by itmCnt desc;
    	higest_bought = limit srtd_orders 1;
    	generate FLATTEN(higest_bought),total_orders as totalCnt;
    };
    -- result will contains ( customer_id , itm_id_bought_higest_times, number_of_times_it_bought, total_items);
    describe result;
    -- result: {higest_bought::cstrId: int,higest_bought::itmId: int,higest_bought::itmCnt: long,totalCnt: long}
    
  3. JOIN:
    Like in RDBMS, join operator in Pig join datasets based on values common to each dataset.
    We will join customer order history dataset with customer information dataset to get interesting result.
    Problem Statement:
    Find the total number of items ordered by customers by city.
    For example customers from CityA has ordered item ‘i1’ 300 times. Customer from CityB ordered item ‘i4’ 129 times.
    Input:
    Customer’s order history data.
    Customer’s basic information data.
    Output:
    Output should contain total number of items ordered by customers by city.
    Schema of output should be :
    (itemId:int, city:chararray, totalItemCnt:long)

    
    -- Problem Stmt : find the a perticular items ordered by customers by city.
    -- For e.g. Customers from CityA has ordered the item 'item1' 350 times.
    -- load the input data :: Schema ( customerId , itemId , order Date, delivery Date );
    orders = load '/testData100k' using PigStorage(',') as (cstrId:int, itmId:int, orderDate: long, deliveryDate: long );
    -- load the customer information data :: (customerId , name , city )
    cstr_info = load '/customerInformation' using PigStorage(',') as (cstrId:int, name:chararray, city:chararray);
    -- join orders and customer_info by cstrId;
    jnd = join orders by cstrId, cstr_info by cstrId;
    describe jnd;
    -- jnd: {orders::cstrId: int,orders::itmId: int,orders::orderDate: long,orders::deliveryDate: long,cstr_info::cstrId: int,cstr_info::name: chararray,cstr_info::city: chararray}
    -- group  by itemId and city 
    jnd_grp = group jnd by (orders::itmId, cstr_info::city);
    describe jnd_grp;
    -- jnd_grp: {group: (orders::itmId: int,cstr_info::city: chararray),jnd: {orders::cstrId: int,orders::itmId: int,orders::orderDate: long,orders::deliveryDate: long,cstr_info::cstrId: int,cstr_info::name: chararray,cstr_info::city: chararray}}
    -- lets count and generate the result.
    result = foreach jnd_grp generate FLATTEN(group) , COUNT(jnd) as cnt;
    describe result;                                                     
    --result: {group::orders::itmId: int,group::cstr_info::city: chararray,cnt: long}
    

Apache Pig : Writing a User Defined Function (UDF)

Preface:

In this post we will write a basic/demo custom function for Apache Pig, called as UDF (User Defined Function).
Pig’s Java UDF extends functionalities of EvalFunc. This abstract class have an abstract method “exec” which user needs to implement in concrete class with appropriate functionality.

Problem Statement:

Lets write a simple Java UDF which takes input as Tuple of two DataBag and check whether second databag(set) is subset of first databag(set).
For example, Assume you have been given tuple of two databags. Each DataBag contains elements(tuples) as number.

Input:
Databag1 : {(10),(4),(21),(9),(50)}
Databag2 : {(9),(4),(50)}
Output:
True

Then function should return true as Databag2 is subset of Databag1.

From implemetation point of view

As we are extending abstract class EvalFucn, we will be implementing exec function. In this function we’ll write logic to find is given set is subset of other or not. We will also override function outputSchema to specify output schema ( boolean : true or false ).


import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;

/**
 * Find the whether given SetB is subset of SetA.
 * <p>
 * 	input:
 * <br>setA : {(10),(4),(21),(9),(50)}</br>
 * <br>setB : {(9),(4),(50)}</br>
 * <br></br>
 *  output:
 * <br>true</br>
 * </p>
 * @author https://shrikantbang.wordpress.com
 *
 */
public class IsSubSet extends EvalFunc {

	@Override
	public Schema outputSchema(Schema input) {
		if(input.size()!=2){
			throw new IllegalArgumentException("input should contains two elements!");
		}

		List fields = input.getFields();
		for(FieldSchema f : fields){
			if(f.type != DataType.BAG){
				throw new IllegalArgumentException("input fields should be bag!");	
			}
		}
		return new Schema(new FieldSchema("isSubset",DataType.BOOLEAN));
	}

	private Set populateSet(DataBag dataBag){
		HashSet set = new HashSet();

		Iterator iter = dataBag.iterator();
		while(iter.hasNext()){
			set.add(iter.next());
		}
		return set;
	}

	@Override
	public Boolean exec(Tuple input) throws IOException {

		Set setA = populateSet((DataBag) input.get(0));
		Set setB = populateSet((DataBag) input.get(1));
		return setA.containsAll(setB) ? Boolean.TRUE : Boolean.FALSE;

	}

}

A Quick Test

Lets test our UDF to find whether given set is subset of other set or not.


-- Register jar which contains UDF.
register '/home/hadoop/udf.jar';

-- Define function for use.
define isSubset IsSubSet();

-- lets assume we have dataset as following :
 dump datset;
--({(10),(4),(21),(9),(50)},{(9),(4),(50)})
--({(50),(78),(45),(7),(4)},{(7),(45),(50)})
--({(1),(2),(3),(4),(5)},{(4),(3),(50)})

-- lets check subset function
result = foreach dataset generate $0,$1,isSubset($0,$1);

dump result;
--({(10),(4),(21),(9),(50)},{(9),(4),(50)},true)
--({(50),(78),(45),(7),(4)},{(7),(45),(50)},false)
--({(1),(2),(3),(4),(5)},{(4),(3),(50)},false)


Apache Pig : Custom Load Function

Preface:
Apache Pig is a platform for analyzing large data sets on top of Hadoop. To load a custom input dataset, Pig uses a loader function which loads the data from file-system.

Pig’s loader function uses specified InputFormat which will split input
data into logical split. Input format in turn uses RecordReader which will read each input split and emits <key,value> for map function as input.

From implementation point of view:
Custom loader function extends LoadFunc and provides implementation for following abstract methods :

  1. This method sets the path of input data.
            abstract void setLocation(String location,Job job)
            
  2. This method returns InputFormat class which will be used to split the input data.
             abstract InputFormat getInputFormat()
             
  3. This method prepares to read data. It takes an arguments as record reader and input split.
               abstract void prepareToRead(RecordReader rr, PigSplit split)
             
  4. This method returns the next tuple to be processed.
    abstract Tuple getNext()
    

Example : Writing custom load function to read email dataset

Lets write the custom load function to read email data set. Usually emails are stored under the user-directory in sub-folders like inbox, outbox, spam, sent etc.Email header contains sender,receivers, subject, date, message-ID and other metadata fields. We can parse the email header using Java APIs.

In following example we will read each email file and parse sender and receivers. Each emitted tuple will contains (sender,{(receiver-1),(receiver-2),…..(receiver-n)}). In case if all recipients are undisclosed then it will emit tuple with empty recipients as (sender,{}).

Lets categories this task into two parts:

Part I: Writing a custom Hadoop input format to read email data set :
Please find implementation of custom input format here.

Part II: Writing a custom pig loader function :
Following implementation of custom load function will read each directory recursively to process each email file.

/**
 * Loader function which will load input email data.
 */
public class EmailLoaderFunc extends LoadFunc {

	private RecordReader reader;
	private TupleFactory tupleFactory;

	public EmailLoaderFunc() {
		tupleFactory = TupleFactory.getInstance();
	}

	@Override
	public InputFormat getInputFormat() throws IOException {
		return new EmailInputFormat();
	}

	/*
	 * Each tuple will like (sender,{(receiver1),(receiver2).....}).
	 * (non-Javadoc)
	 * @see org.apache.pig.LoadFunc#getNext()
	 */
	@Override
	public Tuple getNext() throws IOException {
		Tuple tuple = null;
		try {
			boolean notDone = reader.nextKeyValue();
			if (!notDone) {
				return null;
			}
			EmailParticipants edata = (EmailParticipants) reader.getCurrentKey();
			tuple=tupleFactory.newTuple(2);

			tuple.set(0, edata.getSender());
			DataBag dataBag =  BagFactory.getInstance().newDefaultBag();
			Set&lt;String&gt; rcvrList = edata.getReceiver();
			if(rcvrList.size()&gt;0){
				for(String s:rcvrList){
					if(s.trim().length()!=0){
						Tuple t =tupleFactory.newTuple(1);
						t.set(0,s);
						dataBag.add(t);
					}
				}
				tuple.set(1, dataBag);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return tuple;
	}

	@Override
	public void prepareToRead(RecordReader reader, PigSplit pigSplit)
			throws IOException {
		this.reader = reader;
	}

	@Override
	public void setLocation(String location, Job job) throws IOException {
		FileInputFormat.setInputPaths(job, location);
	}

}

A Quick Test:

To quick test custom loader function follow as:

  1. create a jar which contains load function.
  2. register the loader function.
  3. load the input data using custom loader function.

A quick example of apache pig script as :

register '/home/hadoop/Desktop/emailcustomloader.jar';

input_data = load '/home/hadoop/data/email-dataset/'
using EmailLoaderFunc() as
(sender:chararray,receiver:bag{t:tuple(rcvr:chararray)});


Hadoop :Custom Input Format

Preface:

Hadoop is popular open source distributed computing framework. The data to be processed on top of Hadoop is usually stored on Distributed File System. e.g. HDFS (Hadoop Distributed File System).

To read the data to be processed, Hadoop comes up with InputFormat, which has following responsibilities:

  • Compute the input splits of data
  • Provide a logic to read the input split

From implementation point of view:

InputFormat defines how to read the input data. Its responsibilities are :

  1. Compute the input splits of data :
    InputSplit represents the part of data to be processed by Mapper instance .Each Mapper will get unique input split to process.
    Before MR job starts, InputFormat splits the data into multiple parts based on their logical boundaries and HDFS block size.
    Following method computes input splits.

    abstract List getSplits(JobContext context)
  2. Provide a logic to read the input split :
    Each Mapper gets unique input split to process. Input format provides a logic to read the split, which is an implementation of RecordReader. The record reader will read input split and emit <Key,Value> as input for each map function call.
    Following method creates record reader for given split.

    abstract RecordReader <K,V> createRecordReader(InputSplit is,
    TaskAttemptContext context) 

Example : Writing custom input format to read email dataset

Lets write the custom input format to read email data set. Usually emails are stored under the user-directory in sub-folders like inbox, outbox, spam, sent etc.Email header contains sender,receivers, subject, date, message-ID and other metadata fields. We can parse the email header using Java APIs.

In following example we will read each email file and parse sender and receivers. Input key for map function will be email participants (sender and receiver) and input value will be NullWritable.

Following class contains email participants(sender and receivers) and will be input key for map function.

// it contains email participants (sender and receivers)
public class EmailParticipants implements WritableComparable {

	private Set receivers = null;
	private String sender;
	private int receiverCnt;

	public EmailParticipants(String sender, Set receivers) {
		this.sender = sender;
		this.receivers = receivers;
		receiverCnt = receivers.size();
	}

	public String getSender() {
		return sender;
	}

	public void setSender(String from) {
		this.sender = from;
	}

	public Set getReceiver() {
		return receivers;
	}

	public void setReceivers(Set receivers) {
		this.receivers = receivers;
	}

	public EmailParticipants() {
	}

	@Override
	public void readFields(DataInput dataIp) throws IOException {
		sender = dataIp.readUTF();
		receiverCnt = dataIp.readInt();
		receivers = new HashSet(receiverCnt);
		for (int i = 0; i &lt; receiverCnt; i++) {
			receivers.add(dataIp.readUTF());
		}
	}

	@Override
	public void write(DataOutput dataOp) throws IOException {
		dataOp.writeUTF(receivers.toString());
		dataOp.writeInt(receivers.size());
		Iterator rcvr = receivers.iterator();
		while (rcvr.hasNext()) {
			dataOp.writeUTF(rcvr.next());
		}
	}

	@Override
	public int compareTo(EmailParticipants arg0) {
		return sender.compareTo(arg0.getSender());
	}
}

Following implementation of input format will recursively read each file present under the input data directory. Each input split will contain a single unique email file. So total number of splits will be total number of emails.

// Input format for reading email dataset
public class EmailInputFormat extends FileInputFormat<EmailParticipants, NullWritable> {

  @Override
  public RecordReader<EmailParticipants, NullWritable>
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
   return new EmailRecordReader();
  }

   @Override
	protected List<FileStatus> listStatus(JobContext job) throws IOException {
		return MapRedUtil.getAllFileRecursively(super.listStatus(job),
				job.getConfiguration());
	}

@Override
  protected boolean isSplitable(JobContext context, Path file) {
    CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    return codec == null;
  }
}

Record reader will read entire split (email file) in nextKeyValue()in one go and emit <Key,Value> as <EmailParticipants, NullWritable> as input arguments to map function.

// Record reader to parse email participants
public class EmailRecordReader extends
		RecordReader<EmailParticipants, NullWritable> {
	private boolean toggle = false;

	private NullWritable value = NullWritable.get();
	private Path file = null;
	private Configuration jc;
	private EmailParticipants emailData = null;

	public EmailRecordReader() {
	}

	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException {
		FileSplit split = (FileSplit) genericSplit;
		file = split.getPath();
		jc = context.getConfiguration();
	}

      public boolean nextKeyValue() throws IOException {
        InputStream mailFileInputStream=null;
        try {
            mailFileInputStream= FileSystem.get(jc).open(file);
            Properties props = new Properties();
            Session session = Session.getDefaultInstance(props, null);
            MimeMessage message = new MimeMessage(session, mailFileInputStream);
            MimeMessageParser mimeParser = new MimeMessageParser(message);
            Set<String> receivers = new HashSet<String>();
            populateMailParticipients(mimeParser.getTo(), receivers);
            populateMailParticipients(mimeParser.getCc(), receivers);
            populateMailParticipients(mimeParser.getBcc(), receivers);
            String sender = mimeParser.getFrom().replaceAll("\\.+", ".");
            emailData = new EmailParticipants(sender, receivers);
          } catch (MessagingException e) {
            e.printStackTrace();
            throw new IOException(e);
          } catch (Exception e) {
            e.printStackTrace();
            throw new IOException(e);
          }finally{
        	if (mailFileInputStream != null) {
        		mailFileInputStream.close();
        	}        
          }
          toggle = !toggle;
          return toggle;
       }

	private void populateMailParticipients(List<Address> participants,
			Set<String> list) {
		for (Address addr : participants) {
			String str = addr.toString().replaceAll("\\.+", ".");
			list.add(str);
		}
	}

	@Override
	public EmailParticipants getCurrentKey() throws IOException, InterruptedException {
		return emailData;
	}

	@Override
	public NullWritable getCurrentValue() {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return 0; // TODO
	}
}

A Quick Test:

To test custom input format class we have to configure Hadoop Job as:

Job job = new Job(conf, &quot;custom-input-format-test&quot;);
job.setInputFormatClass(EmailInputFormat.class);