是否可以"merge" Java8 流的元素?



我可以用Java8流分析上一个和/或下一个元素吗?

例如,我能计数相同的相邻数吗?

public class Merge {
   public static void main(String[] args) {
      Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);
      // How to get 3, 2, 2, 4 from above
   }
}

如果你想让它变懒,你必须通过Stream.iterator()Stream.spliterator()转义流API。

否则,可以使用自定义收集器调用终端操作Stream.collect(Collector),该操作将消耗整个流。


@Test
public void test() {
    Stream<Integer> input = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);
    UniqCountSpliterator uniqCountSpliterator = new UniqCountSpliterator(input.spliterator());
    long[] output = uniqCountSpliterator.stream()
            .toArray();
    long[] expected = {3, 2, 2, 4};
    assertArrayEquals(expected, output);
}

import java.util.Spliterator;
import java.util.function.LongConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class UniqCountSpliterator implements Spliterator.OfLong {
    private Spliterator wrapped;
    private long count;
    private Object previous;
    private Object current;
    public UniqCountSpliterator(Spliterator wrapped) {
        this.wrapped = wrapped;
    }
    public LongStream stream() {
        return StreamSupport.longStream(this, false);
    }
    @Override
    public OfLong trySplit() {
        return null;
    }
    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }
    @Override
    public int characteristics() {
        return NONNULL | IMMUTABLE;
    }
    @Override
    public boolean tryAdvance(LongConsumer action) {
        while (wrapped.tryAdvance(next -> current = next) && (null == previous || current.equals(previous))) {
            count++;
            previous = current;
        }
        if (previous == null) {
            return false;
        }
        action.accept(count);
        count = 1;
        previous = null;
        return true;
    }
}

您几乎可以使用flatMap完成此操作。它将适用于无限流,有限流,我没有看到一种方法来检测流的结束从它里面。

    Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);
    Stream<Integer> flatMap = stream.flatMap(new Function<Integer, Stream<Integer>>() {
        Integer prev = null;
        int count;
        public java.util.stream.Stream<Integer> apply(Integer i) {
            if ( i.equals(prev)) {
                count++;
                return Stream.empty();
            } else {
                int c = count;
                count = 1;
                prev = i;
                if ( c > 0 ) {
                    return Stream.of(c);
                } else {
                    return Stream.empty();
                }
            }
        };
    });
    flatMap.forEach(i -> {
        System.out.println(i);
    });

也就是说,你可以从rxjava中获得更好的效果(你可以使用Subject来发出你想要的值,并且能够检测到流的结束)。

当然,如果您想要转义流边界,有许多选项,正如christoffer的回答所示。

如果您不介意两个语句,您可以设置一个列表来填充计数,然后使用reduce:

List<Integer> counts = new ArrayList<>();
Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1).reduce((i, j) -> {
    if (counts.isEmpty()) {
        counts.add(1);
    }
    if (j == i) {
        int index = counts.size() - 1;
        counts.set(index, counts.get(index) + 1);
    } else {
        counts.add(1);
    }
    return j;
});

您可以使用reduce函数来合并TreeMap中的项。如果只需要计数,则可以获取映射的值。

public class Merge {
   public static void main(String[] args) {
      Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);
      Map<Integer,Integer> map = stream.reduce(new TreeMap<Integer,Integer>(), (map, n) -> {
          if (map.isEmpty() || map.lastKey() != n)
              map.put(n, 1);
          else{
              map.put(map.lastKey(), map.lastEntry().getValue() + 1);
          }
          return map;
      }, (list,list2) -> list);
      Collection<Integer> numbers = map.values();
   }
}

Stream::collect()可以为您做到这一点。为了简洁起见,这里使用了一个技巧:由于输入和输出都是数字,特别是int,中间存储可以是int[2],其中第一个元素是我们要计数的东西(在示例中是01),第二个元素是计数器。在后面的帖子中会有"真实的"。something-counter对。

Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);
List<Integer> result = stream.collect(
    ArrayList<int[]>::new,
    (list, i) -> {
        if (list.isEmpty() || list.get(list.size() - 1)[0] != i)
            list.add(new int[] { i, 1 });
        else
            list.get(list.size() - 1)[1]++;
    },
    (l1, l2) -> {
        if (l1.isEmpty() || l2.isEmpty() || l1.get(l1.size() - 1)[0] != l2.get(0)[0])
            l1.addAll(l2);
        else {
            l1.get(l1.size() - 1)[1] += l2.get(0)[1];
            l1.addAll(l2.subList(1, l2.size()));
        }
    }
).stream().map(pair -> pair[1]).collect(Collectors.toList());
System.out.println(result);

ArrayList<int[]>::new是供应商,它在需要时创建中间存储。类似于"身份"。在reduce()中,但它可以被重用。

accumulator函数((list, i)的东西)检查list是否为空或者它的最后一个元素计数不是i,在这种情况下,添加一个新的pair,初始化为i作为元素,1作为计数。否则,它只是在计算与i相同类型的元素时增加现有计数器。collect()做可变累积,因此不返回任何东西(不像reduce())。

还有一个"组合符";((l1, l2)之类的东西),它必须能够将两个部分结果组合成一个(两个中的第一个)。这里我们必须准备好一个部分结果可能会以下一个部分结果的开始结束,这就是if正在检查的:两个列表可能是"盲目的"。如果它们中的任何一个是空的(不太可能,但谁知道呢),或者第一个列表的最后一个元素计数的是第二个列表的第一个元素以外的东西(这里也很方便,我们已经知道列表不是空的,所以最后/第一个元素存在)。否则,我们必须更新第一个列表的最后一个元素(使用第二个列表中第一个元素的计数器),并且只追加剩余的元素。

此时我们有一个int[2] s的列表,一个单独的map - collect对将它们剥离成我们想要的对应部分。

打印输出是[3, 2, 2, 4] by The way.

这样的部分结果和合并它们的需要是如何产生的?一种可能的情况是它们可以并行工作。这是一个带有日志记录的变体,还有一个"属性"。pair对象代替int[2]。这是不正确的,但Map.Entry<key,value>可以作为一对使用。这是一个但笨拙的(就像需要getValue() - setValue()而不是++),但现在输入可以是任何东西,而不仅仅是数字。输入也被更改了,但只是为了日志记录的目的,它也可以与原始的工作。

Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 2, 2, 3, 3, 3, 3);
System.out.println(
    stream.parallel().collect(
        ArrayList<Map.Entry<Integer, Integer>>::new,
        (list, i) -> {
            System.out.println("acc " + list + " " + i + " " + Thread.currentThread());
            if (list.isEmpty() || list.get(list.size() - 1).getKey() != i)
                list.add(new AbstractMap.SimpleEntry<Integer, Integer>(i, 1));
            else {
                var p = list.get(list.size() - 1);
                p.setValue(p.getValue() + 1);
            }
        }, (l1, l2) -> {
            System.out.println("comb " + l1 + " " + l2 + " " + Thread.currentThread());
            if (l1.isEmpty() || l2.isEmpty() || l1.get(l1.size() - 1).getKey() != l2.get(0).getKey())
                l1.addAll(l2);
            else {
                var p = l1.get(l1.size() - 1);
                p.setValue(p.getValue() + l2.get(0).getValue());
                l1.addAll(l2.subList(1, l2.size()));
            }
        }
    )
);

它可能需要运行几次,但有时它实际上是多线程运行的,产生如下输出:

acc [] 2 Thread[main,5,main]
acc [] 0 Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 3 Thread[main,5,main]
acc [] 1 Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 1 Thread[ForkJoinPool.commonPool-worker-3,5,main]
comb [2=1] [3=1] Thread[main,5,main]
acc [] 3 Thread[ForkJoinPool.commonPool-worker-5,5,main]
acc [] 3 Thread[ForkJoinPool.commonPool-worker-5,5,main]
comb [1=1] [1=1] Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 2 Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [2=1] [2=1, 3=1] Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [0=1] [1=2] Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 3 Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 0 Thread[ForkJoinPool.commonPool-worker-3,5,main]
comb [3=1] [3=1] Thread[ForkJoinPool.commonPool-worker-5,5,main]
comb [3=1] [3=2] Thread[ForkJoinPool.commonPool-worker-5,5,main]
acc [] 0 Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [0=1] [0=1] Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [0=2] [0=1, 1=2] Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [2=2, 3=1] [3=3] Thread[ForkJoinPool.commonPool-worker-5,5,main]
comb [0=3, 1=2] [2=2, 3=4] Thread[ForkJoinPool.commonPool-worker-5,5,main]
[0=3, 1=2, 2=2, 3=4]

什么是可见的在这个特定的单独运行是所有11个输入值积累(acc [] x行,[]表明,通过了一项全新的空列表),在一个4线程(主线程和工作者线程3-5-7),这些初始步骤发生在任意顺序,然后结果(但这里的秩序维护)相结合,因此特别结合步骤(当列表不只是附加,但是一个计数器需要更新)确实是经常使用。
这里跳过最后的对数转换,这就是为什么元素和它们的计数都被打印出来的原因。

比较一下,相同的代码,只是没有调用parallel(),只是使用累加器函数,按顺序遍历输入流。我不确定在这种顺序的情况下是否会发生组合,也许对于大的输入。

acc [] 0 Thread[main,5,main]
acc [0=1] 0 Thread[main,5,main]
acc [0=2] 0 Thread[main,5,main]
acc [0=3] 1 Thread[main,5,main]
acc [0=3, 1=1] 1 Thread[main,5,main]
acc [0=3, 1=2] 2 Thread[main,5,main]
acc [0=3, 1=2, 2=1] 2 Thread[main,5,main]
acc [0=3, 1=2, 2=2] 3 Thread[main,5,main]
acc [0=3, 1=2, 2=2, 3=1] 3 Thread[main,5,main]
acc [0=3, 1=2, 2=2, 3=2] 3 Thread[main,5,main]
acc [0=3, 1=2, 2=2, 3=3] 3 Thread[main,5,main]
[0=3, 1=2, 2=2, 3=4]

最新更新