Java的Stream.flatMap()的(一种)反向操作是什么?



Stream.flatMap()操作转换流

a, b, c

到每个输入元素包含零个或多个元素的流中,例如

a1, a2, c1, c2, c3

是否有相反的操作将几个元素批处理成一个新元素?

  • 它不是 .reduce((,因为这只产生一个结果
  • 它不是 collect((,因为这只填充了一个容器 (afaiu(
  • 它不是 forEach((,因为它的返回值只是void并且带有副作用

它存在吗?我能以任何方式模拟它吗?

最后我发现flatMap可以这么说是它自己的"逆"。我监督flatMap不一定会增加元素的数量。它还可以通过为某些元素发出空流来减少元素的数量。要实现分组依据操作,flatMap调用的函数需要最小的内部状态,即最新的元素。它要么返回空流,要么在组的末尾返回缩减为组代表。

这是一个快速实现,如果传入的两个元素不属于同一组,即它们之间是组边界,则必须返回truegroupBordercombiner是将例如 (1,a(、(1,a(、(1,a( 组合到 (3,a( 中的组函数,假设您的组元素是元组(整数,字符串(。

public class GroupBy<X> implements Function<X, Stream<X>>{
private final BiPredicate<X, X> groupBorder;
private final BinaryOperator<X> combiner;
private X latest = null;
public GroupBy(BiPredicate <X, X> groupBorder,
BinaryOperator<X> combiner) {
this.groupBorder = groupBorder;
this.combiner = combiner;
}
@Override
public Stream<X> apply(X elem) {
// TODO: add test on end marker as additonal parameter for constructor
if (elem==null) {
return latest==null ? Stream.empty() : Stream.of(latest);
}
if (latest==null) {
latest = elem;
return Stream.empty();
}
if (groupBorder.test(latest, elem)) {
Stream<X> result = Stream.of(latest);
latest = elem;
return result;
}
latest = combiner.apply(latest,  elem);
return Stream.empty();
}
}

但有一个警告:要传送整个流的最后一组,必须将结束标记作为最后一个元素卡在流中。上面的代码假设它是null的,但可以添加一个额外的结束标记测试器。

我无法想出不依赖于结束标记的解决方案。

此外,我也没有在传入和传出元素之间进行转换。对于唯一的操作,这将起作用。对于计数操作,上一步必须将单个元素映射到计数对象。

看看 StreamEx 中的collapse

StreamEx.of("a1", "a2", "c1", "c2", "c3").collapse((a, b) -> a.charAt(0) == b.charAt(0))
.map(e -> e.substring(0, 1)).forEach(System.out::println);

或者我的叉子功能更多:groupBysplitsliding......

StreamEx.of("a1", "a2", "c1", "c2", "c3").collapse((a, b) -> a.charAt(0) == b.charAt(0))
.map(e -> e.substring(0, 1)).forEach(System.out::println);
// a
// c
StreamEx.of("a1", "a2", "c1", "c2", "c3").splitToList(2).forEach(System.out::println);
// [a1, a2]
// [c1, c2]
// [c3]
StreamEx.of("a1", "a2", "c1", "c2", "c3").groupBy(e -> e.charAt(0))
.forEach(System.out::println);
// a=[a1, a2]
// c=[c1, c2, c3]

你可以破解你的方式。请参阅以下示例:

Stream<List<String>> stream = Stream.of("Cat", "Dog", "Whale", "Mouse")
.collect(Collectors.collectingAndThen(
Collectors.partitioningBy(a -> a.length() > 3),
map -> Stream.of(map.get(true), map.get(false))
));
IntStream.range(0, 10)
.mapToObj(n -> IntStream.of(n, n / 2, n / 3))
.reduce(IntStream.empty(), IntStream::concat)
.forEach(System.out::println);

如您所见,元素也映射到流,然后连接成一个大型流。

主要得益于上面的 StreamEx 答案 user_3380739,您可以在此处使用groupRuns文档

StreamEx.of("a1", "a2", "c1", "c2", "c3").groupRuns( t, u -> t.charAt(0) == u.charAt(0) )
.forEach(System.out::println);
// a=[a1, a2]
// c=[c1, c2, c3]

这是我想到的:

interface OptionalBinaryOperator<T> extends BiFunction<T, T, Optional<T>> {
static <T> OptionalBinaryOperator<T> of(BinaryOperator<T> binaryOperator,
BiPredicate<T, T> biPredicate) {
return (t1, t2) -> biPredicate.test(t1, t2)
? Optional.of(binaryOperator.apply(t1, t2))
: Optional.empty();
}
}
class StreamUtils {
public static <T> Stream<T> reducePartially(Stream<T> stream,
OptionalBinaryOperator<T> conditionalAccumulator) {
Stream.Builder<T> builder = Stream.builder();
stream.reduce((t1, t2) -> conditionalAccumulator.apply(t1, t2).orElseGet(() -> {
builder.add(t1);
return t2;
})).ifPresent(builder::add);
return builder.build();
}
}

不幸的是,我没有时间让它变得懒惰,但它可以通过编写一个自定义Spliterator委托给遵循上述逻辑的stream.spliterator()来完成(而不是使用stream.reduce(),这是一个终端操作(。


附言。我刚刚意识到你想要<T,U>转换,我写了关于<T,T>转换的文章。如果你可以先从T映射到U,然后使用上面的函数,那么就是这样(即使它是次优的(。

如果它更复杂,则需要在提出 API 之前定义减少/合并的条件类型(例如Predicate<T>BiPredicate<T,T>BiPredicate<U,T>,甚至可能是Predicate<List<T>>(。

有点像StreamEx,你可以手动实现Spliterator。 例如

collectByTwos(Stream.of(1, 2, 3, 4), (x, y) -> String.format("%d%d", x, y))

。使用以下代码返回 "12"、"34" 的流:

public static <X,Y> Stream<Y> collectByTwos(Stream<X> inStream, BiFunction<X,X,Y> mapping) {
Spliterator<X> origSpliterator = inStream.spliterator();
Iterator<X> origIterator = Spliterators.iterator(origSpliterator);
boolean isParallel = inStream.isParallel();
long newSizeEst = (origSpliterator.estimateSize() + 1) / 2;
Spliterators.AbstractSpliterator<Y> lCombinedSpliterator =
new Spliterators.AbstractSpliterator<>(newSizeEst, origSpliterator.characteristics()) {
@Override
public boolean tryAdvance(Consumer<? super Y> action) {
if (! origIterator.hasNext()) {
return false;
}
X lNext1 = origIterator.next();
if (! origIterator.hasNext()) {
throw new IllegalArgumentException("Trailing elements of the stream would be ignored.");
}
X lNext2 = origIterator.next();
action.accept(mapping.apply(lNext1, lNext2));
return true;
}
};
return StreamSupport.stream(lCombinedSpliterator, isParallel)
.onClose(inStream::close);
}

(我认为这对于并行流可能不正确。

最新更新