潜在无限流的功能解压缩实现



我已经实现了功能unzip()操作,如下所示:

public static <T, U, V> Tuple2<Stream<U>, Stream<V>> unzip(
        Stream<T> stream, 
        Function<T, Tuple2<U, V>> unzipper) {
    return stream.map(unzipper)
        .reduce(new Tuple2<>(Stream.<U>empty(), Stream.<V>empty()),
            (unzipped, tuple) -> new Tuple2<>(
                Stream.concat(unzipped.$1(), Stream.of(tuple.$1())),
                Stream.concat(unzipped.$2(), Stream.of(tuple.$2()))),
            (unzipped1, unzipped2) -> new Tuple2<>(
                Stream.concat(unzipped1.$1(), unzipped2.$1()),
                Stream.concat(unzipped1.$2(), unzipped2.$2())));
}

这工作正常,因为输入流没有很多元素。这是因为访问深度串联流的元素可能会导致StackOverflowException。根据Stream.concat()的文件:

实施说明:

从重复串联构造流时要小心。访问深度串联流的元素可能会导致深度调用链,甚至StackOverflowException

对于少数元素,我的unzip实现有效。给定一个类Person

class Person {
    public final String name;
    public final int age;
    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }
}

如果我有一连串的人:

Stream<Person> people = Stream.of(
    new Person("Joe", 52), 
    new Person("Alan", 34), 
    new Person("Peter", 42));

我可以通过以下方式使用我的unzip()实现:

Tuple2<Stream<String>, Stream<Integer>> result = StreamUtils.unzip(people, 
        person -> new Tuple2<>(person.name, person.age));
List<String> names = result.$1()
    .collect(Collectors.toList()); // ["Joe", "Alan", "Peter"]
List<Integer> ages = result.$2()
    .collect(Collectors.toList()); // [52, 34, 42]

这是正确的。

所以我的问题是:unzip()有没有办法处理许多元素(可能是无限的)?

注意:为了完整起见,这是我的不可变Tuple2类:

public final class Tuple2<A, B> {
    private final A $1;
    private final B $2;
    public Tuple2(A $1, B $2) {
        this.$1 = $1;
        this.$2 = $2;
    }
    public A $1() {
        return $1;
    }
    public B $2() {
        return $2;
    }
}

您的解决方案不仅容易出现潜在的StackOverflowError,而且即使不存在StackOverflowError的风险,也远不能处理潜在的无限流。关键是,您正在构造一个流,但它是串联的单元素流的流,源流的每个元素一个。换句话说,在返回 unzip 方法时,您将拥有一个完全具体化的数据结构,这将比收集到ArrayList或简单toArray()操作的结果消耗更多的内存。

但是,当您想在之后执行collect时,无论如何,支持潜在无限流的想法都是没有意义的,因为收集意味着处理所有元素而不会短路。

一旦您放弃支持无限流的想法并专注于收集操作,就会有一个更简单的解决方案。从这个解决方案中获取代码,用Tuple2替换Pair并将累加器逻辑从"条件"更改为"两者",我们得到:

public static <T, A1, A2, R1, R2> Collector<T, ?, Tuple2<R1,R2>> both(
    Collector<T, A1, R1> first, Collector<T, A2, R2> second) {
    Supplier<A1> s1=first.supplier();
    Supplier<A2> s2=second.supplier();
    BiConsumer<A1, T> a1=first.accumulator();
    BiConsumer<A2, T> a2=second.accumulator();
    BinaryOperator<A1> c1=first.combiner();
    BinaryOperator<A2> c2=second.combiner();
    Function<A1,R1> f1=first.finisher();
    Function<A2,R2> f2=second.finisher();
    return Collector.of(
        ()->new Tuple2<>(s1.get(), s2.get()),
        (p,t)->{ a1.accept(p.$1(), t); a2.accept(p.$2(), t); },
        (p1,p2)->new Tuple2<>(c1.apply(p1.$1(), p2.$1()), c2.apply(p1.$2(), p2.$2())),
        p -> new Tuple2<>(f1.apply(p.$1()), f2.apply(p.$2())));
}

这可以像

Tuple2<List<String>, List<Integer>> namesAndAges=
    Stream.of(new Person("Joe", 52), new Person("Alan", 34), new Person("Peter", 42))
        .collect(both(
            Collectors.mapping(p->p.name, Collectors.toList()),
            Collectors.mapping(p->p.age,  Collectors.toList())));
List<String> names = namesAndAges.$1(); // ["Joe", "Alan", "Peter"]
List<Integer> ages = namesAndAges.$2(); // [52, 34, 42]

链接答案的陈述在这里也成立。您几乎可以执行所有可以表示为收集器中的流操作的操作。

如果你想用一个函数更接近你的原始代码,从流元素映射到一个Tuple2,你可以包装上面的解决方案,如

public static <T, T1, T2, A1, A2, R1, R2> Collector<T, ?, Tuple2<R1,R2>> both(
    Function<? super T, ? extends Tuple2<? extends T1, ? extends T2>> f,
    Collector<T1, A1, R1> first, Collector<T2, A2, R2> second) {
    return Collectors.mapping(f, both(
            Collectors.mapping(Tuple2::$1, first),
            Collectors.mapping(Tuple2::$2, second)));
}

并像使用它一样使用

Tuple2<List<String>, List<Integer>> namesAndAges=
    Stream.of(new Person("Joe", 52), new Person("Alan", 34), new Person("Peter", 42))
        .collect(both(
            p -> new Tuple2<>(p.name, p.age), Collectors.toList(), Collectors.toList()));

您可能会识别函数p -> new Tuple2<>(p.name, p.age),就像您传递给unzip方法一样。上述解决方案是惰性的,但需要将"解压"后的操作表示为收集器。如果你想要Stream,并接受解决方案的非懒惰性质,就像你原来的unzip操作一样,但希望它比concat更有效,你可以使用:

public static <T, U, V> Tuple2<Stream<U>, Stream<V>> unzip(
    Stream<T> stream,  Function<T, Tuple2<U, V>> unzipper) {
    return stream.map(unzipper)
        .collect(Collector.of(()->new Tuple2<>(Stream.<U>builder(), Stream.<V>builder()),
            (unzipped, tuple) -> {
                unzipped.$1().accept(tuple.$1()); unzipped.$2().accept(tuple.$2());
            },
            (unzipped1, unzipped2) -> {
                unzipped2.$1().build().forEachOrdered(unzipped1.$1());
                unzipped2.$2().build().forEachOrdered(unzipped1.$2());
                return unzipped1;
            },
            tuple -> new Tuple2<>(tuple.$1().build(), tuple.$2().build())
        ));
}

这可以作为基于concat的解决方案的替代品。它还将完全存储流元素,但它将使用一个针对增量填充和使用一次(在Stream操作中)的用例进行优化的Stream.Builder。这比收集到ArrayList中(至少使用参考实现)更有效,因为它使用"旋转缓冲区",在增加容量时不需要复制。对于大小可能未知的流,这是最有效的解决方案(对于已知大小的流,toArray()性能会更好)。

最新更新