Java收集器提供一个输入列表



我正在尝试实现一个简单的收集器,它接受一个收集器列表,同时以稍微不同的方式从流中收集值。

它与Collectors.teeing非常相似,但不同之处在于它是

  1. 接收收集器列表,而不是两个
  2. 要求所有收集器生成相同类型的值

我想要的类型签名是

public static <T, R> Collector<T, ?, List<R>> list(
final List<Collector<T, ?, R>> downstreamCollectors);

创建这样一个收集器的一种方法是递归地配对teering收集器,如下所示:

public static <T, R> Collector<T, ?, List<R>> list(
final List<Collector<T, ?, R>> downstreamCollectors) {
return listrec(
Collectors.collectingAndThen(downstreamCollectors.get(0), List::of),
downstreamCollectors.stream().skip(1).toList());
}
private static <T, R> Collector<T, ?, List<R>> listrec(
final Collector<T, ?, List<R>> teedCollectors,
final List<Collector<T, ?, R>> downstreamCollectors) {
if (downstreamCollectors.size() == 0) {
return teedCollectors;
} else {
return listrec(
teeing(
teedCollectors,
downstreamCollectors.get(0),
(l, s) -> Stream.concat(l.stream(), Stream.of(s)).toList()),
downstreamCollectors.stream().skip(1).toList());
}
}

有些东西感觉有点"关闭";使用这个解决方案,所以我尝试自己创建收集器,类似于:

public static <T, R> Collector<T, ?, List<R>> list2(
final List<Collector<T, ?, R>> downstreamCollectors) {
return Collector.of(
() -> downstreamCollectors.stream().map(c -> c.supplier().get()).toList(),
(accumulators, t) ->
IntStream.range(0, downstreamCollectors.size())
.forEach(
i -> downstreamCollectors.get(i).accumulator().accept(accumulators.get(i), t)),
(accumulator1, accumulator2) ->
IntStream.range(0, downstreamCollectors.size())
.mapToObj(
i ->
downstreamCollectors
.get(i)
.combiner()
.apply(accumulator1.get(i), accumulator2.get(i)))
.toList(),
accumulators ->
IntStream.range(0, downstreamCollectors.size())
.mapToObj(i -> downstreamCollectors.get(i).finisher().apply(accumulators.get(i)))
.toList());
}

由于下游收集器的累加器类型中存在无界通配符,因此无法编译。将类型签名更改为

public static <T, A, R> Collector<? super T, ?, List<R>> list2(
final List<Collector<? super T, A, R>> downstreamCollectors);

解决了这个问题,但不幸的是,由于下游收集器(如java.util.stream.Collectors中的内置收集器(通常在累加器类型中具有无边界通配符,因此该方法的可用性大大降低。

有没有其他方法可以实现这一点,即在方法签名中保留通配符?

我使用的是OpenJDK 17.0.2。

不能以类型安全的方式处理具有任意累加器类型的收集器列表作为平面列表,因为需要声明n类型变量来捕获这些类型,其中n是实际列表大小。

因此,您只能将处理实现为操作的组合,每个操作在编译时都有有限数量的组件,就像您的递归方法一样。

这仍然具有简化的潜力,比如用downstreamCollectors.isEmpty()替换downstreamCollectors.size() == 0,或者用无复制的downstreamCollectors.subList(1, downstreamCollectors.size())替换downstreamCollectors.stream().skip(1).toList()

但最大的影响是用流减少操作取代递归代码:

public static <T, R> Collector<T, ?, List<R>> list(List<Collector<T, ?, R>> collectors) {
return collectors.stream()
.<Collector<T, ?, List<R>>>map(c-> Collectors.collectingAndThen(c, List::of))
.reduce((c1, c2) -> teeing(c1, c2,
(l1, l2) -> Stream.concat(l1.stream(), l2.stream()).toList()))
.orElseThrow(() -> new IllegalArgumentException("no collector specified"));
}

如果你没有大量的收藏家来创作,这可能会很好。这种简洁的解决方案的一个缺点是,在实际合并结果之前,每个结果都将被封装到单个元素列表中,甚至结果合并可能会承担多个列表复制操作。

此结果处理可以使用进行优化

public static <T, R> Collector<T, ?, List<R>> list(List<Collector<T, ?, R>> collectors) {
int num = collectors.size();
switch(num) {
case 0: throw new IllegalArgumentException("no collector specified");
case 1: return collectingAndThen(collectors.get(0), List::of);
case 2: return teeing(collectors.get(0), collectors.get(1), List::of);
case 3: return teeing(teeing(collectors.get(0), collectors.get(1), List::of),
collectors.get(2), (l,r) -> List.of(l.get(0), l.get(1), r));
default:
}
Collector<T,?,List<R>> c = teeing(collectors.get(0), collectors.get(1), (r1, r2) -> {
var list = new ArrayList<R>(num);
list.add(r1);
list.add(r2);
return list;
});
for(int ix = 2; ix < num; ix ++) {
c = teeing(c, collectors.get(ix), (list, r) -> { list.add(r); return list; });
}
return collectingAndThen(c, List::copyOf);
}

这为少数收集器提供了特殊情况,这些收集器的结果可以用于直接构建不可变的结果列表。对于其他情况,在将列表转换为最终的不可变列表之前,首先将所有结果添加到ArrayList中,以防止过度的列表复制。最后一步可以省略,如果获得一个不可变的结果列表并不重要,我只是尽可能接近原始方法的Stream.toList()行为。

在流处理过程中,幕后仍然存在一种不平衡的递归结构,它禁止大量的收集器。有两种方法可以解决这个问题。

  1. 实现您自己的类型安全的teing变体,该变体公开中间容器类型,以允许构建一个平衡树,并通过遍历该树将所有结果收集到一个列表中,而无需额外的中间存储。

  2. 放弃类型安全,使用平面列表和原始类型实现收集器。尽量限制不安全的代码。

但当您估计了要"tee"的收集器的预期数量并发现第一个解决方案足够好时,可能就不需要这样做了。

最新更新