Apache Pig : Group By, Nested Foreach, Join Example

Preface:

In this post we will learn about basic Apache Pig Operations like group by, foreach, nested foreach and join by examples.

Input Data:

For experimental purpose we have generated dummy test data.
We have following dataset for operations.

  1. Customer’s order history data :

    This data contains the orders placed by customer. For example customer with id ‘A’ had ordered item ‘I’. Order date in milliseconds was ‘1391230800000’ and Delivery date in milliseconds was ‘1391317200000’.
    Schema : ( customerId:int, itemId:int, orderDate:long, deliveryDate:long)
    You can generate the dummy data using following script:

  2. Customer’s basic information data :

    This data contains the basic information of customer. For example customer Id, name and city.
    Schema : ( custoerId:int, name:chararray, city:chararry)
    You can generate the dummy test data using following script:

Examples:

  1. GROUP BY:
    Lets start with ‘group by’ operation.
    Problem Statement:
    Find the number of items bought by each customer.
    Input:
    Customer’s order history data.
    Output:
    Output should contain the total number of items bought by each customer.
    Schema of output should be : (customerId:int , itmCnt:int );

    
    -- Problem Stmt : find the number of items bought by each customer.
    -- load the input data :: Schema ( customerId , itemId , order Date, delivery Date );
    orders = load '/testData100k' using PigStorage(',') as (cstrId:int, itmId:int, orderDate: long, deliveryDate: long );
    -- group input by customer id
    grpd = group orders by cstrId;
    -- count number of items bought by each customer
    items_by_customer = foreach grpd generate group as cstrId, COUNT(orders) as itemCnt;
    describe items_by_customer;
    --items_by_customer: {cstrId: int,itemCnt: long}
    
  2. NESTED FOREACH:
    Now lets discuss about Nested Foreach Operation. We can use foreach operator to iterate over input records, but we can also apply different relational operations to each record in same data processing pipeline.

    Problem Statement:
    Find the total number of items bought by each customer and which item he/she bought higest times. For example Customer A has bought items ‘i1’ 10 times , ‘i2’ 5 times and ‘i3’ 7 times. So total number of items taken by Customer A is 10+5+7=22, out of which item ‘i1’ is taken higest times.
    Input:
    Customer’s order history data.
    Output:
    Output should contain total number of items bought by customer and which of item he/she bought higest time.
    Schema of output should be :
    (customerId:int, itemId:int , higestBoughtItemCnt:long, totalItemCnt:long)

    
    -- Problem Stmt : find the number of items bought by each customer
    -- which item he/she bought highest time.
    -- load the input data :: Schema ( customerId , itemId , order Date, delivery Date );
    orders = load '/testData100k' using PigStorage(',') as (cstrId:int, itmId:int, orderDate: long, deliveryDate: long );
    -- group by  custorer-id and item-id
    grpd_cstr_itm = group orders by (cstrId,itmId);
    grpd_cstr_itm_cnt = foreach grpd_cstr_itm generate group.cstrId as cstrId, group.itmId as itmId, COUNT(orders) as itmCnt;
    -- group by cstrId 
    grpd_cstr = group grpd_cstr_itm_cnt by cstrId ;
    describe grpd_cstr;
    -- grpd_cstr: {group: int,grpd_cstr_itm_cnt: {cstrId: int,itmId: int,itmCnt: long}}
    -- iterate over grpd_cstr_itm and find total number of items bought by customer and which item he/or she bought higest time.
    result = foreach grpd_cstr{
    	total_orders = SUM(grpd_cstr_itm_cnt.itmCnt);
    	srtd_orders = order grpd_cstr_itm_cnt by itmCnt desc;
    	higest_bought = limit srtd_orders 1;
    	generate FLATTEN(higest_bought),total_orders as totalCnt;
    };
    -- result will contains ( customer_id , itm_id_bought_higest_times, number_of_times_it_bought, total_items);
    describe result;
    -- result: {higest_bought::cstrId: int,higest_bought::itmId: int,higest_bought::itmCnt: long,totalCnt: long}
    
  3. JOIN:
    Like in RDBMS, join operator in Pig join datasets based on values common to each dataset.
    We will join customer order history dataset with customer information dataset to get interesting result.
    Problem Statement:
    Find the total number of items ordered by customers by city.
    For example customers from CityA has ordered item ‘i1’ 300 times. Customer from CityB ordered item ‘i4’ 129 times.
    Input:
    Customer’s order history data.
    Customer’s basic information data.
    Output:
    Output should contain total number of items ordered by customers by city.
    Schema of output should be :
    (itemId:int, city:chararray, totalItemCnt:long)

    
    -- Problem Stmt : find the a perticular items ordered by customers by city.
    -- For e.g. Customers from CityA has ordered the item 'item1' 350 times.
    -- load the input data :: Schema ( customerId , itemId , order Date, delivery Date );
    orders = load '/testData100k' using PigStorage(',') as (cstrId:int, itmId:int, orderDate: long, deliveryDate: long );
    -- load the customer information data :: (customerId , name , city )
    cstr_info = load '/customerInformation' using PigStorage(',') as (cstrId:int, name:chararray, city:chararray);
    -- join orders and customer_info by cstrId;
    jnd = join orders by cstrId, cstr_info by cstrId;
    describe jnd;
    -- jnd: {orders::cstrId: int,orders::itmId: int,orders::orderDate: long,orders::deliveryDate: long,cstr_info::cstrId: int,cstr_info::name: chararray,cstr_info::city: chararray}
    -- group  by itemId and city 
    jnd_grp = group jnd by (orders::itmId, cstr_info::city);
    describe jnd_grp;
    -- jnd_grp: {group: (orders::itmId: int,cstr_info::city: chararray),jnd: {orders::cstrId: int,orders::itmId: int,orders::orderDate: long,orders::deliveryDate: long,cstr_info::cstrId: int,cstr_info::name: chararray,cstr_info::city: chararray}}
    -- lets count and generate the result.
    result = foreach jnd_grp generate FLATTEN(group) , COUNT(jnd) as cnt;
    describe result;                                                     
    --result: {group::orders::itmId: int,group::cstr_info::city: chararray,cnt: long}
    

Advertisements

One thought on “Apache Pig : Group By, Nested Foreach, Join Example

  1. Hi,
    Your blog is very useful. I am trying a similar problem like the one described in “Nested foreach” example. However , I am getting the below error . Please help if you know of a solution.
    “[main] ERROR org.apache.pig.tools.grunt.Grunt – ERROR 1200: Pig script failed to parse:
    expression is not a project expression: (Name: ScalarExpression) Type: null Uid: null)”

    This is my code so far , (it is on similar lines except I sum the value multiple times due to multiple grouping)
    individualHist = LOAD ‘/nfs/data/cdom_faster/csv/WeekendIndividual.csv’ USING PigStorage(‘,’) AS (card_no:chararray, org:int, des:int, n_hour:int, count:int);
    grpCOD = group individualHist by (card_no,org,des);
    grpCOD_cnt = foreach grpCOD generate group.card_no as card_no, group.org as org, group.des as des, SUM(individualFarecardHist.count) as CODcount;
    grpCO = group individualFarecardHist by (card_no,org);

    result = foreach grpCO{
    totalCount = SUM(grpCOD_cnt.CODcount);
    srtd_counts = order grpCOD_cnt by CODcount desc;
    highest_count = limit srtd_counts 1;
    generate FLATTEN(highest_count),totalCount;
    };

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s