为什么这种并行化的梅森素数计算算法挂起?



在尝试Joshua Bloch的视频中的代码时,我在原始代码中添加了.parallel(),试图使其更快一些(就执行速度而言(。结果,它开始挂起,运行 5 分钟后没有完成,而它的顺序版本在几秒钟内完成。

我想知道,以下代码并行化后挂起的原因是什么?

import java.math.BigInteger;
import java.util.stream.Stream;
import static java.math.BigInteger.ONE;
import static java.math.BigInteger.TWO;
class Scratch {
static Stream<BigInteger> primes() {
return Stream.iterate(TWO, BigInteger::nextProbablePrime);
}
public static void main(String[] args) {
primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
.filter(mersenne -> mersenne.isProbablePrime(50))
.limit(20)
.parallel()
.forEach(System.out::println);
}
}

附言

基础ForkJoinPoolparallelism为 12。

注意

这不是一个答案,但在评论中发布所有这些是不可能的,而且很难阅读。


找到每个 Mersenne 素数比前一个需要更长的时间,这可以通过删除.parallel()并将.limit(20)更改为更高的值来查看。

观察1

有人会认为在这个流上使用并行性会忽略.limit(20)指令,但事实并非如此,因为我们将限制更改为较低的值 - 例如.limit(10),我们将得到前 10 个 mersenne 素数(无序,但这是意料之中的(:

primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
.filter(mersenne -> mersenne.isProbablePrime(50))
.limit(10)
.parallel()
.forEach(m -> System.out.println(Thread.currentThread().getName() + ": " + m));

示例输出:

ForkJoinPool.commonPool-worker-3: 8191
ForkJoinPool.commonPool-worker-13: 131071
ForkJoinPool.commonPool-worker-19: 524287
ForkJoinPool.commonPool-worker-23: 2305843009213693951
ForkJoinPool.commonPool-worker-5: 31
ForkJoinPool.commonPool-worker-9: 7
ForkJoinPool.commonPool-worker-31: 2147483647
ForkJoinPool.commonPool-worker-27: 3
ForkJoinPool.commonPool-worker-17: 127
ForkJoinPool.commonPool-worker-7: 618970019642690137449562111

(请注意,所有人都在ForkJoinPool.commonPool上使用工人 - 有解决方法,但这与问题无关(

观察2

如果我们限制输入流按primes()获取 - 例如700,程序按预期完成:

primes().limit(700)
.map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
.filter(mersenne -> mersenne.isProbablePrime(50))
.limit(20)
.parallel()
.forEach(m -> System.out.println(Thread.currentThread().getName() + ": " + m));
ForkJoinPool.commonPool-worker-19: 6...
ForkJoinPool.commonPool-worker-3: 21...
ForkJoinPool.commonPool-worker-19: 2...
ForkJoinPool.commonPool-worker-23: 1...
ForkJoinPool.commonPool-worker-5: 68...
ForkJoinPool.commonPool-worker-23: 7...
ForkJoinPool.commonPool-worker-19: 1...
ForkJoinPool.commonPool-worker-13: 4...
ForkJoinPool.commonPool-worker-3: 81...
ForkJoinPool.commonPool-worker-17: 5...
ForkJoinPool.commonPool-worker-7: 25...
ForkJoinPool.commonPool-worker-3: 12...
ForkJoinPool.commonPool-worker-13: 1...
ForkJoinPool.commonPool-worker-19: 3...
ForkJoinPool.commonPool-worker-23: 3...
ForkJoinPool.commonPool-worker-31: 1...
ForkJoinPool.commonPool-worker-5: 52...
ForkJoinPool.commonPool-worker-9: 28...
ForkJoinPool.commonPool-worker-27: 1...
ForkJoinPool.commonPool-worker-21: 1...

将输入流限制更改为更高的值(1000、1500 等(会增加程序完成所需的时间,而最终结果保持不变(20 个 mersenne 素数(。


考虑到上述两个观察结果,一个有根据的猜测是,当这个流被.parallel()时,处理的输入元素比获得所需结果所需的更多

。当需要mersenne primes计数较高(高于 ~14(并且输入流没有限制(如示例中所示(时,程序可能需要很长时间才能完成并有效地挂起。

事实证明,该程序并没有挂起,而是在检查一些大数字的初数时绊倒了。

当初始源没有限制时(@MartinBG线索的赞美(,底层Spliterator前进太多(考虑到任务(,并为下游提供更大的数字,导致极端的计算复杂性。

我已经修改了输入,所以它不会前进那么多,只返回下一个可能的素数作为下一个Spliterator块。 即,每次池中的线程准备好继续处理以下数字时,它都必须接收一个由单个数字组成的新块,这与通常观察到的大块相反。

import java.math.BigInteger;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.lang.Long.MAX_VALUE;
import static java.math.BigInteger.ONE;
import static java.math.BigInteger.TWO;
class Scratch {
static Stream<BigInteger> primes() {
return StreamSupport.stream(new Spliterator<>() {
private AtomicReference<BigInteger> seedReference = new AtomicReference<>(TWO);
private BigInteger resolveNextProbablePrime() {
return seedReference.getAndUpdate(BigInteger::nextProbablePrime);
}
@Override
public boolean tryAdvance(Consumer<? super BigInteger> action) {
action.accept(resolveNextProbablePrime());
return true;
}
@Override
public Spliterator<BigInteger> trySplit() {
return Spliterators.spliterator(new BigInteger[]{resolveNextProbablePrime()}, characteristics());
}
@Override
public long estimateSize() {
return MAX_VALUE;
}
@Override
public int characteristics() {
return ORDERED & DISTINCT & SORTED & NONNULL & IMMUTABLE & CONCURRENT;
}
}, true);
}
public static void main(String[] args) {
primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
.filter(mersenne -> mersenne.isProbablePrime(50))
.limit(20)
.parallel()
.forEach(System.out::println);
}
}

该程序的新版本在几秒钟内完成,利用多个内核进行计算。

给定的解决方案仍然不能治愈遇到的特征;它只是在这种特殊情况下有效,因为获取下一个可能的素数并从中执行计算是相当安全的。

根据您的硬件,仍然可以获得 20 个数字。这里的问题是并行流可能不会给你带来太多的性能提升。删除parallel呼叫:

primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
.filter(mersenne -> mersenne.isProbablePrime(50))
.limit(20) 
.forEach(System.out::println);

它可以在我的机器(i5-6300u,2.4GHz(上大约50秒内完成。 或:

primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
.filter(mersenne -> mersenne.isProbablePrime(50))
.limit(20)
.parallel()
.sorted()
.sequential()
.forEach(System.out::println);

最新更新