ApachePig:将一个字段数据集添加到另一个字段,作为一个新列



假设我们有这样的场景:

dataset1.csv:

datefield
field11, field12, field13
field21, field22, field23
field31, field32, field33

最好的方法是什么?:

field11, field12, field13, datefield
field21, field22, field23, datefield
field31, field32, field33, datefield

我试图用以下列生成一个数据集(relation1)(在加载和生成之后):

field11, field12, field13
field21, field22, field23
field31, field32, field33

另一个(relation2)只有这个列(在加载和生成之后):

datefield

然后这样做:

finalResult=FOREACH数据集1生成UDFFunction1(relation1::f1)作为firstFields,UDFFunction2(relation2::f2)作为lastField

但我得到了一个列需要从关系中投影才能用作标量

问题出在第二个字段(带有日期字段的字段)上。

我希望避免联接,因为这将是一个有点混乱的变通方法。

有什么建议吗?

请忘记我的UDF函数。它们只是相应地格式化输入元组。

添加pig脚本:

register 's3://bucketName/lib/MyJar.jar';
define ParseOutFilesUDF packageName.ParseOutFiles;
define FormatTimestartedUDF packageName.FormatTimestarted;
outFile = LOAD 's3://bucketName/input/' USING PigStorage ('|');
--This UDF just reformat each tuple, adding a String to each Tuple and returning a new one. 
resultAll = FOREACH outFile GENERATE ParseOutFilesUDF(*) as initial;
--load the same csv again to get the TIMESTARTED field
timestarted = LOAD 's3://bucketName/input/' USING PigStorage ('|') as f1;
--filter to get only one record, which is somth like TIMESTARTED=20160101
filetered = FILTER timestarted BY (f1 matches '.*TIMESTARTED.*');
timestarted = foreach filetered GENERATE $0 as fechaStarted;
-- the FormatTimestartedUDF just gets ride of 'TIMESTARTED=' in order to get the date '20160101'
in this FOREACH sentence is where it fails with the 'A column needs to be projected...'
finalResult = FOREACH outFile GENERATE f1, FormatTimestartedUDF(timestarted) as f2;
STORE finalResult INTO 's3://bucketName/output/';

您得到错误是因为您引用的f1在outfile中不存在,并且timestarted是一个关系,而不是字段。此外,您应该使用resultALL中的字段并进行筛选。

outFile = LOAD 's3://bucketName/input/' USING PigStorage ('|');
resultAll = FOREACH outFile GENERATE ParseOutFilesUDF(*) as initial;
timestarted = LOAD 's3://bucketName/input/' USING PigStorage ('|') as f1;
filtered = FILTER timestarted BY (f1 matches '.*TIMESTARTED.*');
finalResult = FOREACH resultALL GENERATE resultAll.initial, FormatTimestartedUDF(filtered.$0);
STORE finalResult INTO 's3://bucketName/output/';

最新更新