我想从0-50范围内生成5个不同的随机数,然后并行地对它们执行一些运算。当我写这篇文章时,程序从未结束:
new Random().ints(0, 50)
.distinct()
.limit(5)
.parallel()
.forEach(d -> System.out.println("s: " + d));
我试过用peek调试它。我有无数条c:
线,50条d:
线,但零条l:
或s:
线:
new Random().ints(0, 50)
.peek(d -> System.out.println("c: " + d))
.distinct()
.peek(d -> System.out.println("d: " + d))
.limit(5)
.peek(d -> System.out.println("l: " + d))
.parallel()
.forEach(d -> System.out.println("s: " + d));
我的实现有什么问题?
首先,请注意.parallel()
会改变整个管道的并行状态,因此它会影响所有操作,而不仅仅是后续操作。在您的情况下
new Random().ints(0, 50)
.distinct()
.limit(5)
.parallel()
.forEach(d -> System.out.println("s: " + d));
与相同
new Random().ints(0, 50)
.parallel()
.distinct()
.limit(5)
.forEach(d -> System.out.println("s: " + d));
不能仅并行化管道的一部分。它要么平行,要么不平行。
现在回到你的问题上来。由于Random.ints
是一个无序流,所以选择了distinct
和limit
的无序实现,所以它不是这个问题的重复(问题在有序的不同实现中)。这里的问题出现在无序的limit()
实现中。为了减少可能的争用,它不检查在不同线程中发现的元素的总数,直到每个子任务获得至少128个元素或上游用完为止(参见实现,1 << 7 = 128
)。在您的案例中,上游distinct()
只发现了50个不同的元素,并拼命遍历输入,希望找到更多,但下游limit()
没有发出信号停止处理,因为它希望在检查是否达到限制之前收集至少128个元素(这不是很明智,因为限制小于128)。因此,要使这件事发挥作用,您应该选择至少(128*个CPU)不同的元素。在我的4核机器上,使用new Random().ints(0, 512)
成功,而new Random().ints(0, 511)
卡住。
为了解决这个问题,我建议按顺序收集随机数,并在那里创建一个新的流:
int[] ints = new Random().ints(0, 50).distinct().limit(5).toArray();
Arrays.stream(ints).parallel()
.forEach(d -> System.out.println("s: " + d));
我认为您想要执行一些昂贵的下游处理。在这种情况下,并行生成5个随机数并不是很有用。按顺序执行此部分会更快。
更新:提交了错误报告并提交了补丁。
您对ints(0, 50)
的呼叫
返回一个实际上不受限制的伪随机内数值流,每个都符合给定的起源(包括)和约束(不包括)。
我最初认为问题出在未终止的IntStream
上,但我重复了这个问题。
new Random().ints(0, 50)
.distinct().limit(5)
.parallel().forEach(a -> System.out.println(a));
进入无限循环,而
new Random().ints(0, 50)
.distinct().limit(5)
.forEach(a -> System.out.println(a));
饰面正确。
我的Stream知识不太好,无法解释,但很明显,并行化效果不佳(可能是由于无限流)。
最接近您尝试执行的操作的选项可能是使用iterate
和unordered
:
Random ran = new Random();
IntStream.iterate(ran.nextInt(50), i -> ran.nextInt(50))
.unordered()
.distinct()
.limit(5)
.parallel()
.forEach(System.out::println);
将无限流与distinct
和parallel
一起使用可能是昂贵的或者导致没有响应。有关更多信息,请参阅API说明或此问题。