PIG 脚本:将具有开始日期和结束日期的单行展开为多行,每天一行



>我需要一个 PIG 脚本将包含广告系列 ID、开始日期、结束日期和金额的单行转换为多行:每天一行,其中包含已分配给该天的金额。 例如,模式是:广告系列 ID、开始日期、结束日期、总金额

我的输入行有:

1,2015-01-01,2015-01-10,10000

我需要为这个"活动"的每一天创建单独的行,将每天的总金额划分为如下所示的架构:

广告系列 ID、日期、金额

1,2015-01-01,1000
1,2015-01-02,1000
1,2015-01-03,1000

。等活动的每一天一行

我希望我可以使用嵌套的foreach和DaysBet函数。

这个问题使用

标准猪来解决有点困难,挑战将是两个日期之间的动态日期生成。 假设如果月份重叠(ie, 2015-01-28 to 2015-02-06),那么猪没有任何智能来生成 1 月的 4 天和 2 月的 6 天。

为了解决这个问题,一种选择是将日期生成部分移动到自定义UDF,解析输入并生成中间日期。

示例 1:not overlapped one input和日期

输入:

1,2015-01-01,2015-01-10,10000

猪脚本:

REGISTER PARSEDATE.jar; 
A = LOAD 'input' Using PigStorage(',') AS (campaignId:int,startDate,endDate,totalAmount:int);
B = FOREACH A GENERATE campaignId,DaysBetween((datetime)endDate,(datetime)startDate)+1 AS cnt, totalAmount,TOBAG(*) AS mybag;
C = FOREACH B GENERATE campaignId,FLATTEN(TOKENIZE(mypackage.PARSEDATE(BagToString(mybag)),'#')),(int)(totalAmount/cnt) AS totalAmount;
STORE C INTO 'output' USING PigStorage(',');

输出:

1,2015-01-01,1000
1,2015-01-02,1000
1,2015-01-03,1000
1,2015-01-04,1000
1,2015-01-05,1000
1,2015-01-06,1000
1,2015-01-07,1000
1,2015-01-08,1000
1,2015-01-09,1000
1,2015-01-10,1000

示例 2:two inputsfirst input not overlappedsecond input overlapped

输入 1:

1,2015-01-01,2015-01-10,10000
2,2015-01-28,2015-02-06,10000

猪脚本:

REGISTER PARSEDATE.jar; 
A = LOAD 'input1' Using PigStorage(',') AS (campaignId:int,startDate,endDate,totalAmount:int);
B = FOREACH A GENERATE campaignId,DaysBetween((datetime)endDate,(datetime)startDate)+1 AS cnt, totalAmount,TOBAG(*) AS mybag;
C = FOREACH B GENERATE campaignId,FLATTEN(TOKENIZE(mypackage.PARSEDATE(BagToString(mybag)),'#')),(int)(totalAmount/cnt) AS totalAmount;
STORE C INTO 'output1' USING PigStorage(',');

输出:

1,2015-01-01,1000
1,2015-01-02,1000
1,2015-01-03,1000
1,2015-01-04,1000
1,2015-01-05,1000
1,2015-01-06,1000
1,2015-01-07,1000
1,2015-01-08,1000
1,2015-01-09,1000
1,2015-01-10,1000
2,2015-01-28,1000
2,2015-01-29,1000
2,2015-01-30,1000
2,2015-01-31,1000
2,2015-02-01,1000
2,2015-02-02,1000
2,2015-02-03,1000
2,2015-02-04,1000
2,2015-02-05,1000
2,2015-02-06,1000

您需要编译以下 java 代码并生成PARSEDATE.jar文件并包含在您的 pig 脚本中。我只是临时编写了这段代码,您可以根据需要进行优化。

解析日期

.java
package mypackage;
import java.io.*;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.joda.time.LocalDate;
import org.joda.time.Days;
public class PARSEDATE extends EvalFunc<String> {
        public String exec(Tuple input) throws IOException {
                //Get the input String from request
                String inputString = (String)input.get(0);
                //Get Startdate from second column
                String startDate = inputString.split("_")[1];
                //Get enddate from third column
                String endDate = inputString.split("_")[2];
                LocalDate st = new LocalDate(startDate);
                LocalDate et = new LocalDate(endDate);
                //Calculate days between given dates
                int days = Days.daysBetween(st, et).getDays()+1;
                //Append all the dates as String
                String output="";
                for (int index=0; index < days; index++) 
                {
                   //Each dates are delimited by '#', so it will be easy to parse in the pig script.                     
                   output = output+"#"+st.plusDays(index).toString();
                }
                return output;
        }
}

最新更新