我需要在Pig中处理一个数据集,该数据集每天午夜可用一次。因此,我有一个Oozie协调员,负责日程安排,并在每天00:00生成一个工作流。文件名遵循URI方案
hdfs://${dataRoot}/input/raw${YEAR}${MONTH}${DAY}${HOUR}.avro
其中${HOUR}总是"00"。
数据集中的每个条目都包含一个UNIX时间戳,我想过滤掉那些在晚上11:45(23:45)之前有时间戳的条目。由于我需要在过去的数据集上运行,因此需要根据当前处理的日期动态设置定义阈值的时间戳值。例如,从2013年12月12日开始处理数据集需要阈值1418337900。因此,必须由协调器来设置阈值。
据我所知,在EL中不可能将格式化的日期转换为UNIX时间戳。我想出了一个非常巧妙的解决方案:协调器将阈值的日期和时间传递给相应的工作流,该工作流启动Pig脚本的参数化实例。
coordinator.xml摘录:
<property>
<name>threshold</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -15, 'MINUTE'), 'yyyyMMddHHmm')}</value>
</property>
workflow.xml摘录:
<action name="foo">
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<script>${applicationPath}/flights.pig</script>
<param>jobInput=${jobInput}</param>
<param>jobOutput=${jobOutput}</param>
<param>threshold=${threshold}</param>
</pig>
<ok to="end"/>
<error to="error"/>
</action>
Pig脚本需要将此格式化的日期时间转换为UNIX时间戳。为此,我编写了一个UDF:
public class UnixTime extends EvalFunc<Long> {
private long myTimestamp = 0L;
private static long convertDateTime(String dt, String format)
throws IOException {
DateFormat formatter;
Date date = null;
formatter = new SimpleDateFormat(format);
try {
date = formatter.parse(dt);
} catch (ParseException ex) {
throw new IOException("Illegal Date: " + dt + " format: " + format);
}
return date.getTime() / 1000L;
}
public UnixTime(String dt, String format) throws IOException {
myTimestamp = convertDateTime(dt, format);
}
@Override
public Long exec(Tuple input) throws IOException {
return myTimestamp;
}
}
在Pig脚本中,创建了一个宏,使用协调器/工作流的输入初始化UDF。然后,您可以过滤时间戳。
DEFINE THRESH mystuff.pig.UnixTime('$threshold', 'yyyyMMddHHmm');
d = LOAD '$jobInput' USING PigStorage(',') AS (time: long, value: chararray);
f = FILTER d BY d <= THRESH();
...
我遇到的问题让我想到了一个更普遍的问题,如果可以转换Pig中的输入参数,并将其再次用作某种常量。有没有更好的方法来解决这个问题,或者我的方法是否不必要地复杂?
编辑:TL;DR
经过更多的搜索,我发现有人有同样的问题:http://grokbase.com/t/pig/user/125gszzxnx/survey-where-are-all-the-udfs-and-macros
感谢Gaurav推荐存钱罐中的UDF。如果不使用声明和shell脚本,似乎就没有高性能的解决方案。
您可以将Pig脚本放入Python脚本中并传递值。
#!/usr/bin/python
import sys
import time
from org.apache.pig.scripting import Pig
P = Pig.compile("""d = LOAD '$jobInput' USING PigStorage(',') AS (time: long, value: chararray);
f = FILTER d BY d <= '$thresh';
""")
jobinput = {whatever you defined}
thresh = {whatever you defined in the UDF}
Q = P.bind({'thresh':thresh,'jobinput':jobinput})
results = Q.runSingle()
if results.isSuccessful() == "FAILED":
raise "Pig job failed"