我正在练习java8并行流部分,并编写一个程序,该程序将作为参数传递的数字从0到该数字求和。
例如,如果我传递 10,它将对从 1 到 10 的数字求和并返回输出。
以下是程序
public class ParellelStreamExample {
public static void main(String[] args) {
System.out.println("Long Range value - "+ Long.MIN_VALUE + " to "+ Long.MAX_VALUE);
long startTime = System.nanoTime();
long sum = sequentailSum(100000000);
System.out.println(
"Time in sequential execution " + (System.nanoTime() - startTime) / 1000000 + " msec with sum = " + sum);
long startTime1 = System.nanoTime();
long sum1 = parellelSum(100000000);
System.out.println("Time in parallel execution " + (System.nanoTime() - startTime1) / 1000000
+ " msec with sum = " + sum1);
}
private static Long parellelSum(long n) {
return Stream.iterate(1l, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);
}
private static Long sequentailSum(long n) {
return Stream.iterate(1l, i -> i + 1).limit(n).reduce(0L, Long::sum);
}
}
我收到的输出是
Long Range value - -9223372036854775808 to 9223372036854775807
Time in sequential execution 1741 msec with sum = 5000000050000000
Exception in thread "main" java.lang.OutOfMemoryError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.SliceOps$1.opEvaluateParallelLazy(SliceOps.java:155)
at java.util.stream.AbstractPipeline.sourceSpliterator(AbstractPipeline.java:431)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:474)
at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.parellelSum(ParellelStreamExample.java:21)
at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.main(ParellelStreamExample.java:14)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Long.valueOf(Long.java:840)
at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.lambda$0(ParellelStreamExample.java:21)
at com.abhishek.javainaction.stream.parellel.ParellelStreamExample$$Lambda$3/250421012.apply(Unknown Source)
at java.util.stream.Stream$1.next(Stream.java:1033)
at java.util.Spliterators$IteratorSpliterator.trySplit(Spliterators.java:1784)
at java.util.stream.AbstractShortCircuitTask.compute(AbstractShortCircuitTask.java:114)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
为什么这个程序不并行运行并且发生gc开销, 相反,它应该在并行部分运行得更快,因为它使用 fork/join 框架并通过线程内部执行进程。
里面出了什么问题?
这里有几件事出错了。
- 您正在尝试使用
System.nanoTime()
而不是JMH之类的东西对代码进行基准测试。 - 您正在尝试在
Long
上模仿一个琐碎的计算(sum
(,而不是使用LongStream
。 如果 JVM 无法摆脱束缚,指针追逐的开销很容易压倒并行性的好处。 - 您正在尝试模仿由
iterate
产生的固有顺序流。 流框架将尝试通过缓冲流并将其调度到多个线程来执行您的要求,这会增加大量开销。 - 您正在有序并行流上使用
limit
。 这要求流框架执行大量额外的同步,以确保使用n
第一个元素来生成结果。 您将看到,如果将.unordered()
放入并行流中,执行时间将大大减少,但结果将是不确定的,因为您将获得一些n
元素的总和,而不一定是第一个n
元素。
正确的方法是使用 JMH 并将iterate(...).limit(...)
替换为LongStream.rangeClosed(1, n)
我明确不讨论基准缺陷(;)(。这里的主要问题似乎是对使用特定 Stream 函数及其行为的理解。
尝试类似操作:
LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum)
但公平地说,顺序的也应该改编:
LongStream.rangeClosed(1, n).reduce(0L, Long::sum)
现在我得到了这个运行时行为:
Long Range value - -9223372036854775808 to 9223372036854775807
Time in sequential execution 90 msec with sum = 5000000050000000
Time in parallel execution 25 msec with sum = 5000000050000000
我假设,这就是你所期望的。
像其他所有 API 一样,您必须了解特定方法在做什么,尤其是在您想要并行的情况下。但正如你所看到的,即使是顺序处理也利用了使用这种不同方法的巨大优势。
查看 https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps 以了解方法的类型。
例如限制的使用:
同样,与遇到顺序有着内在联系的操作, 例如 limit((,可能需要缓冲以确保正确的排序,破坏并行性的好处。