合并器在java并行流中的实际作用是什么



我对java流中的reduce方法有基本的理解。然而,组合器在parallelStream中的作用我并不清楚。在下面的代码片段中,在第一个块中,我使用了组合器,但在第二个块中没有。然而,这两种情况的结果是相同的。

List<Integer> intarr = Arrays.asList(10,20,30);
Integer totsum = intarr.stream().reduce(20, (a,b) -> a+b, (a,b) -> a+b);
System.out.println("total sum: "+totsum);
List<Integer> intarr = Arrays.asList(10,20,30);
Integer totsum = intarr.stream().reduce(20, (a,b) -> a+b);
System.out.println("total sum: "+totsum);

在2参数reduce方法的情况下,累加器是BinaryOperator,在3参数reduce方法的情况中是BiFunction。众所周知,这将有助于类型转换。例如,如果我需要将int转换为double,我可以将我的标识指定为20.0,组合器将以double类型输出它。

但是,除了类型转换之外,使用组合器实际上还有什么优势?

组合器的目的是处理结果和输入何时属于不同类型,而这里的情况并非如此。例如,如果您想要一个String串联,您可以通过编写来使用reduce

stream.reduce("", (String str, int i) -> str + i, (String a, String b) -> a + b)

在这种情况下,当流在并行中减少时,不同的块将被单独累积,然后与组合器组合。

这并不是说使用组合器更容易"有利的";而不是使用组合器。reduce的两个过载用于不同的目的。正如您所指出的,它是类型转换。

不带组合器的reduce(T, BinaryOperator<T>)返回一个T——与流管道中的内容类型相同。另一方面,使用组合器的reduce(U, BiFunction<U, ? super T, U>, BinaryOperator<U>)返回一个U——与流中的类型不同。

BigInteger sum = bigIntegerStream.reduce(BigInteger.ZERO, BigInteger::add);

我可以使用无组合器的版本来添加一个BigIntegers流。该流将是Stream<BigInteger>,我想要的结果也是BigInteger类型。我可以使用组合器版本将Stream<Long>BigInteger相加。注意流的类型与我想要的结果类型有何不同。

BigInteger sum = longStream.reduce(BigInteger.ZERO, (bi, l) -> bi.add(BigInteger.valueOf(l)), BigInteger::add);

可以说,在使用无组合器的reduce之前,您也可以先执行map

当结果类型与流的类型不同时,需要组合器的原因是因为reduce操作可以并行运行。例如,整个流可以分成几个部分,每个部分并行减少。

如果结果类型与流类型T相同,则使用所提供的二进制运算符将每个部分缩减为一个T,而我们只剩下一堆Ts。我们可以使用相同的二进制运算符进一步将这堆Ts缩减为单个T

如果结果类型U与流类型U不同,那么BiFucntion将把每个部分缩减为U,而我们只剩下一堆U,我们不知道该怎么办,因为BiFunction只获取UT,并返回U。我们需要一个额外的BinaryOperator<U>来帮助我们组合Us。

因此,如果您想要不同的结果类型,那么组合器是必要的。


此外,20不是有效的标识值。有效身份必须满足

accumulator.apply(identity, u) == u

用于无组合器版本的所有u,以及

combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

用于组合器版本的所有CCD_ 39和CCD_。

3-arg版本的签名为:

<U> U reduce(U identity,
BiFunction<U,? super T,U> accumulator,
BinaryOperator<U> combiner)

当以下其中一项(或两项(为真时使用:

  • 结果类型(U(与流元素类型(T(不同
  • combiner不同于accumulator

如果这些都不是真的,那么2-arg版本更好(更简单、更容易(:

T reduce(T identity,
BinaryOperator<T> accumulator)

对于问题中显示的示例,使用3-arg版本没有任何优势。


有很多不同类型的例子,例如,请参阅其他答案。

类型相同,但combineraccumulator的一个例子是-减去运算符:

List<Integer> intarr = Arrays.asList(10,20,30);
// Sequential processing doesn't use combiner: totsum = -60
Integer totsum = intarr.stream().reduce(0, (a,b) -> a - b, (a,b) -> a - b);
// Parallel processing with same combiner does work: totsum = -20
Integer totsum = intarr.parallelStream().reduce(0, (a,b) -> a - b);
Integer totsum = intarr.parallelStream().reduce(0, (a,b) -> a - b, (a,b) -> a - b);
// Parallel processing requires a different combiner: totsum = -60
Integer totsum = intarr.parallelStream().reduce(0, (a,b) -> a - b, (a,b) -> a + b);

这是因为通过并行处理,我们为该输入流获得3个线程,因此代码变为:

thread1Result = accumulator.apply(0/*identity*/, 10);  // = 0 - 10 = -10
thread2Result = accumulator.apply(0/*identity*/, 20);  // = 0 - 20 = -20
thread3Result = accumulator.apply(0/*identity*/, 30);  // = 0 - 30 = -30
// Bad combiner: (a,b) -> a - b
result = combiner.apply(thread1Result, thread2Result); // = -10 - -20 = +10
result = combiner.apply(result, thread3Result);        // = +10 - -30 = -20
// Good combiner: (a,b) -> a + b
result = combiner.apply(thread1Result, thread2Result); // = -10 + -20 = -30
result = combiner.apply(result, thread3Result);        // = -30 + -30 = -60

相关内容

  • 没有找到相关文章

最新更新