在并行Java流中处理随机数



我想从0-50范围内生成5个不同的随机数,然后并行地对它们执行一些运算。当我写这篇文章时,程序从未结束:

new Random().ints(0, 50)
            .distinct()
            .limit(5)
            .parallel()
            .forEach(d -> System.out.println("s: " + d));

我试过用peek调试它。我有无数条c:线,50条d:线,但零条l:s:线:

new Random().ints(0, 50)
            .peek(d -> System.out.println("c: " + d))
            .distinct()
            .peek(d -> System.out.println("d: " + d))
            .limit(5)
            .peek(d -> System.out.println("l: " + d))
            .parallel()
            .forEach(d -> System.out.println("s: " + d));

我的实现有什么问题?

首先,请注意.parallel()会改变整个管道的并行状态,因此它会影响所有操作,而不仅仅是后续操作。在您的情况下

new Random().ints(0, 50)
            .distinct()
            .limit(5)
            .parallel()
            .forEach(d -> System.out.println("s: " + d));

与相同

new Random().ints(0, 50)
            .parallel()
            .distinct()
            .limit(5)
            .forEach(d -> System.out.println("s: " + d));

不能仅并行化管道的一部分。它要么平行,要么不平行。

现在回到你的问题上来。由于Random.ints是一个无序流,所以选择了distinctlimit的无序实现,所以它不是这个问题的重复(问题在有序的不同实现中)。这里的问题出现在无序的limit()实现中。为了减少可能的争用,它不检查在不同线程中发现的元素的总数,直到每个子任务获得至少128个元素或上游用完为止(参见实现,1 << 7 = 128)。在您的案例中,上游distinct()只发现了50个不同的元素,并拼命遍历输入,希望找到更多,但下游limit()没有发出信号停止处理,因为它希望在检查是否达到限制之前收集至少128个元素(这不是很明智,因为限制小于128)。因此,要使这件事发挥作用,您应该选择至少(128*个CPU)不同的元素。在我的4核机器上,使用new Random().ints(0, 512)成功,而new Random().ints(0, 511)卡住。

为了解决这个问题,我建议按顺序收集随机数,并在那里创建一个新的流:

int[] ints = new Random().ints(0, 50).distinct().limit(5).toArray();
Arrays.stream(ints).parallel()
      .forEach(d -> System.out.println("s: " + d));

我认为您想要执行一些昂贵的下游处理。在这种情况下,并行生成5个随机数并不是很有用。按顺序执行此部分会更快。

更新:提交了错误报告并提交了补丁。

您对ints(0, 50) 的呼叫

返回一个实际上不受限制的伪随机内数值流,每个都符合给定的起源(包括)和约束(不包括)。

我最初认为问题出在未终止的IntStream上,但我重复了这个问题。

new Random().ints(0, 50)
            .distinct().limit(5)
            .parallel().forEach(a -> System.out.println(a));

进入无限循环,而

new Random().ints(0, 50)
            .distinct().limit(5)
            .forEach(a -> System.out.println(a));

饰面正确。

我的Stream知识不太好,无法解释,但很明显,并行化效果不佳(可能是由于无限流)。

最接近您尝试执行的操作的选项可能是使用iterateunordered:

Random ran = new Random();
IntStream.iterate(ran.nextInt(50), i -> ran.nextInt(50))
    .unordered()
    .distinct()
    .limit(5)
    .parallel()
    .forEach(System.out::println);

将无限流与distinctparallel一起使用可能是昂贵的或者导致没有响应。有关更多信息,请参阅API说明或此问题。

最新更新