使用级联框架,我过滤了一些元组并将它们输出到S3文件中。
我还想计算输出的总元组的数量。一种简单的方法是下载输出S3文件并计算行数。
是否有其他方法将输出元组的计数转储到另一个文件?
这可以使用流流程来完成。
我们可以编写一个自定义函数
public class Counter extends BaseOperation implements Function {
...
@Override
public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
functionCall.getOutputCollector().add(functionCall.getArguments());
flowProcess.increment(counterGroup, counterName, 1);
}
...
}
使用:
groupByPipe = new Each(groupByPipe, new Counter(COUNTER_GROUP_NAME, Counter));