>我需要一个 PIG 脚本将包含广告系列 ID、开始日期、结束日期和金额的单行转换为多行:每天一行,其中包含已分配给该天的金额。 例如,模式是:广告系列 ID、开始日期、结束日期、总金额
我的输入行有:
1,2015-01-01,2015-01-10,10000
我需要为这个"活动"的每一天创建单独的行,将每天的总金额划分为如下所示的架构:
广告系列 ID、日期、金额
1,2015-01-01,1000
1,2015-01-02,1000
1,2015-01-03,1000
。等活动的每一天一行
我希望我可以使用嵌套的foreach和DaysBet函数。
标准猪来解决有点困难,挑战将是两个日期之间的动态日期生成。 假设如果月份重叠(ie, 2015-01-28 to 2015-02-06
),那么猪没有任何智能来生成 1 月的 4 天和 2 月的 6 天。
为了解决这个问题,一种选择是将日期生成部分移动到自定义UDF,解析输入并生成中间日期。
示例 1:仅not overlapped
one input
和日期
输入:
1,2015-01-01,2015-01-10,10000
猪脚本:
REGISTER PARSEDATE.jar;
A = LOAD 'input' Using PigStorage(',') AS (campaignId:int,startDate,endDate,totalAmount:int);
B = FOREACH A GENERATE campaignId,DaysBetween((datetime)endDate,(datetime)startDate)+1 AS cnt, totalAmount,TOBAG(*) AS mybag;
C = FOREACH B GENERATE campaignId,FLATTEN(TOKENIZE(mypackage.PARSEDATE(BagToString(mybag)),'#')),(int)(totalAmount/cnt) AS totalAmount;
STORE C INTO 'output' USING PigStorage(',');
输出:
1,2015-01-01,1000
1,2015-01-02,1000
1,2015-01-03,1000
1,2015-01-04,1000
1,2015-01-05,1000
1,2015-01-06,1000
1,2015-01-07,1000
1,2015-01-08,1000
1,2015-01-09,1000
1,2015-01-10,1000
示例 2:two inputs
,first input
not overlapped
,second input
overlapped
输入 1:
1,2015-01-01,2015-01-10,10000
2,2015-01-28,2015-02-06,10000
猪脚本:
REGISTER PARSEDATE.jar;
A = LOAD 'input1' Using PigStorage(',') AS (campaignId:int,startDate,endDate,totalAmount:int);
B = FOREACH A GENERATE campaignId,DaysBetween((datetime)endDate,(datetime)startDate)+1 AS cnt, totalAmount,TOBAG(*) AS mybag;
C = FOREACH B GENERATE campaignId,FLATTEN(TOKENIZE(mypackage.PARSEDATE(BagToString(mybag)),'#')),(int)(totalAmount/cnt) AS totalAmount;
STORE C INTO 'output1' USING PigStorage(',');
输出:
1,2015-01-01,1000
1,2015-01-02,1000
1,2015-01-03,1000
1,2015-01-04,1000
1,2015-01-05,1000
1,2015-01-06,1000
1,2015-01-07,1000
1,2015-01-08,1000
1,2015-01-09,1000
1,2015-01-10,1000
2,2015-01-28,1000
2,2015-01-29,1000
2,2015-01-30,1000
2,2015-01-31,1000
2,2015-02-01,1000
2,2015-02-02,1000
2,2015-02-03,1000
2,2015-02-04,1000
2,2015-02-05,1000
2,2015-02-06,1000
您需要编译以下 java 代码并生成PARSEDATE.jar
文件并包含在您的 pig 脚本中。我只是临时编写了这段代码,您可以根据需要进行优化。
解析日期
.javapackage mypackage;
import java.io.*;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.joda.time.LocalDate;
import org.joda.time.Days;
public class PARSEDATE extends EvalFunc<String> {
public String exec(Tuple input) throws IOException {
//Get the input String from request
String inputString = (String)input.get(0);
//Get Startdate from second column
String startDate = inputString.split("_")[1];
//Get enddate from third column
String endDate = inputString.split("_")[2];
LocalDate st = new LocalDate(startDate);
LocalDate et = new LocalDate(endDate);
//Calculate days between given dates
int days = Days.daysBetween(st, et).getDays()+1;
//Append all the dates as String
String output="";
for (int index=0; index < days; index++)
{
//Each dates are delimited by '#', so it will be easy to parse in the pig script.
output = output+"#"+st.plusDays(index).toString();
}
return output;
}
}