我写了一个简化作业,其中我的键和值是复合的。我需要两次通过值迭代,因此尝试缓存值,但是相同的值正在重复。请帮助我。
下面是我的还原类。
public static class Reducerclass extends Reducer<Text,Text,Text,Text> {
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss a");
private MultipleOutputs<Text, Text> multipleOutputs;
@Override
public void setup(Context context){
multipleOutputs = new MultipleOutputs<Text, Text>(context);
}
public void reduce(Text rkey, Iterable<Text> rvalue, Context context) throws IOException, InterruptedException {
ArrayList<Text> ArrayList = new ArrayList<Text>();
Iterator<Text> iterator = rvalue.iterator();
while (iterator.hasNext()) {
Text writable = iterator.next();
System.out.println("first iteration: " + writable);
ArrayList.add(new Text(writable));
context.write(new Text(rkey + ", "),new Text(writable + "--> first iteration"));
}
int size = ArrayList.size();
for (int i = 0; i < size; ++i) {
System.out.println("second iteration: " + ArrayList.get(i));
context.write(new Text(rkey + ", "),new Text(ArrayList.get(i) + "--> Second iteration--->" + "Array Size -->" + size));
}
}
}
输入文件:
1509075052824 13.0619798 80.1468367
1509075112825 13.07537311 80.19612851
1509073985114 13.0507832 80.25069245
1509075072824 12.91690859 80.06168244
预期输出:
first iteration: 1509075052824 13.0619798 80.1468367
first iteration: 1509075112825 13.07537311 80.19612851
first iteration: 1509073985114 13.0507832 80.25069245
first iteration: 1509075072824 12.91690859 80.06168244
second iteration: 1509075052824 13.0619798 80.1468367
second iteration: 1509075112825 13.07537311 80.19612851
second iteration: 1509073985114 13.0507832 80.25069245
second iteration: 1509075072824 12.91690859 80.06168244
当前输出:
1509075042823 12.91877675 80.0466234--> first iteration
1509075042823 12.91877675 80.0466234--> Second iteration--->Array Size -->1
1509074972821 12.91738175 80.05294765--> first iteration
1509074972821 12.91738175 80.05294765--> Second iteration--->Array Size -->1
1509073795109 13.05561879 80.11920979--> first iteration
1509073795109 13.05561879 80.11920979--> Second iteration--->Array Size -->1
1509075132826 12.97988349 80.16310309--> first iteration
1509075132826 12.97988349 80.16310309--> Second iteration--->Array Size -->1
1509073885111 13.06640175 80.2457003--> first iteration
1509073885111 13.06640175 80.2457003--> Second iteration--->Array Size -->1
预先感谢!
如果要将还原器全部收集到一个单个阵列列表中,则需要一个还原器。
要做到这一点,您需要映射器始终输出相同的rkey
i首先将迭代器标记两次迭代,然后在第一次迭代后重置。关键是使用ReduceContext.ValueIterator
ReduceContext.ValueIterator iterator = (ReduceContext.ValueIterator)rvalue.iterator();
iterator.mark();//mark the first location
//iterator the first time.
while (iterator.hasNext()) {
//do your things
}
//reset the iterator.
iterator.reset();
//iterator the second time.
while (iterator.hasNext()) {
//do your things
}
添加了Mark后,任务将在每个next()处输出_0_0.out。第一个迭代无法明确标记,因为reset()需要标记。我们可以在第二个迭代器之后执行此操作。
ReduceContext.ValueIterator iterator = (ReduceContext.ValueIterator)rvalue.iterator();
iterator.mark();//mark the first location
//iterator the first time.
while (iterator.hasNext()) {
//do your things
}
//reset the iterator.
iterator.reset();
//clearMark
iterator.clearMark();
//iterator the second time.
while (iterator.hasNext()) {
//do your things
}
//the Iterable class is the same with each key group, so we need to reset to clear the clearMarkFlag. and ignore the Exception to get it run.
try{
iterator.reset();
}catch(Exception ignored){}