在地图中进行两次迭代



我写了一个简化作业,其中我的键和值是复合的。我需要两次通过值迭代,因此尝试缓存值,但是相同的值正在重复。请帮助我。

下面是我的还原类。

 public static class Reducerclass  extends Reducer<Text,Text,Text,Text> {
            DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss a");
            private MultipleOutputs<Text, Text> multipleOutputs;
            @Override
            public void setup(Context context){
                multipleOutputs = new MultipleOutputs<Text, Text>(context);
            }
            public void reduce(Text rkey, Iterable<Text> rvalue, Context context) throws IOException, InterruptedException {             
                ArrayList<Text> ArrayList  = new ArrayList<Text>();
                Iterator<Text> iterator = rvalue.iterator();
                while (iterator.hasNext()) {
                    Text writable = iterator.next();
                    System.out.println("first iteration: " + writable);
                    ArrayList.add(new Text(writable));
context.write(new Text(rkey + ", "),new Text(writable + "--> first iteration"));
                }
                 int size = ArrayList.size();
                    for (int i = 0; i < size; ++i) {
                        System.out.println("second iteration: " + ArrayList.get(i));
context.write(new Text(rkey + ", "),new Text(ArrayList.get(i) + "--> Second iteration--->" + "Array Size -->" + size));
                    }

            }

        }

输入文件:

1509075052824 13.0619798 80.1468367
1509075112825 13.07537311 80.19612851
1509073985114 13.0507832 80.25069245
1509075072824 12.91690859 80.06168244

预期输出:

first iteration: 1509075052824 13.0619798 80.1468367
first iteration: 1509075112825 13.07537311 80.19612851
first iteration: 1509073985114 13.0507832 80.25069245
first iteration: 1509075072824 12.91690859 80.06168244
second iteration: 1509075052824 13.0619798 80.1468367
second iteration: 1509075112825 13.07537311 80.19612851
second iteration: 1509073985114 13.0507832 80.25069245
second iteration: 1509075072824 12.91690859 80.06168244

当前输出:

1509075042823 12.91877675 80.0466234--> first iteration
1509075042823 12.91877675 80.0466234--> Second iteration--->Array Size -->1
1509074972821 12.91738175 80.05294765--> first iteration
1509074972821 12.91738175 80.05294765--> Second iteration--->Array Size -->1
1509073795109 13.05561879 80.11920979--> first iteration
1509073795109 13.05561879 80.11920979--> Second iteration--->Array Size -->1
1509075132826 12.97988349 80.16310309--> first iteration
1509075132826 12.97988349 80.16310309--> Second iteration--->Array Size -->1
1509073885111 13.06640175 80.2457003--> first iteration
1509073885111 13.06640175 80.2457003--> Second iteration--->Array Size -->1

预先感谢!

如果要将还原器全部收集到一个单个阵列列表中,则需要一个还原器。

要做到这一点,您需要映射器始终输出相同的rkey

i首先将迭代器标记两次迭代,然后在第一次迭代后重置。关键是使用ReduceContext.ValueIterator

    ReduceContext.ValueIterator iterator = (ReduceContext.ValueIterator)rvalue.iterator();
      iterator.mark();//mark the first location
      //iterator the first time.
      while (iterator.hasNext()) {
       //do your things
      }
      //reset the iterator.
      iterator.reset();
      //iterator the second time.
      while (iterator.hasNext()) {
       //do your things
      }

添加了Mark后,任务将在每个next()处输出_0_0.out。第一个迭代无法明确标记,因为reset()需要标记。我们可以在第二个迭代器之后执行此操作。

    ReduceContext.ValueIterator iterator = (ReduceContext.ValueIterator)rvalue.iterator();
      iterator.mark();//mark the first location
      //iterator the first time.
      while (iterator.hasNext()) {
       //do your things
      }
      //reset the iterator.
      iterator.reset();
      //clearMark
      iterator.clearMark();
      //iterator the second time.
      while (iterator.hasNext()) {
       //do your things
      }
      //the Iterable class is the same with each key group, so we need to reset to clear the clearMarkFlag. and ignore the Exception to get it run.
      try{
          iterator.reset();
      }catch(Exception ignored){}

最新更新