java 8 parallelStream().for Each Result数据丢失



有两个测试用例使用parallelStream():

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 20000; i++) {
src.add(i);
}
List<String> strings = new ArrayList<>();

src.parallelStream().filter(integer -> (integer % 2) == 0).forEach(integer -> strings.add(integer + ""));

System.out.println("=size=>" + strings.size());
=size=>9332
List<Integer> src = new ArrayList<>();
for (int i = 0; i < 20000; i++) {
src.add(i);
}
List<String> strings = new ArrayList<>();
src.parallelStream().forEach(integer -> strings.add(integer + ""));
System.out.println("=size=>" + strings.size());
=size=>17908

为什么在使用parallelStream时总是丢失数据?我做错了什么?

ArrayList不是线程安全的。你需要做

List<String> strings = Collections.synchronizedList(new ArrayList<>());

List<String> strings = new Vector<>();

以确保所有更新同步,或切换到

List<String> strings = src.parallelStream()
.filter(integer -> (integer % 2) == 0)
.map(integer -> integer + "")
.collect(Collectors.toList());

并将列表构建留给Streams框架。请注意,collect返回的列表是否可修改是未定义的,因此如果这是一个要求,您可能需要修改您的方法。

就性能而言,Stream.collect可能比使用Stream.forEach添加到同步集合快得多,因为Streams框架可以在不同步的情况下单独处理每个线程中的值集合,并在最后以线程安全的方式组合结果。

ArrayList不是线程安全的。当一个线程看到具有30个元素的列表时,另一个线程可能仍然看到29并覆盖第30个位置(丢失1个元素(。

当支持列表的数组需要调整大小时,可能会出现另一个问题。创建一个新的数组(大小是原来的两倍(,并将原始数组中的元素复制到其中。虽然其他线程可能已经添加了内容,但进行调整大小的线程可能没有看到这一点多个线程正在调整大小,最终只有1个线程获胜。

当使用多个线程时,您需要在访问列表时进行一些同步,或者使用多线程安全列表(通过将其封装在SynchronizedList中或使用CopyOnWriteArrayList来提及2种可能的解决方案(。更好的方法是对流使用collect方法将所有内容放入列表中。

如果不小心使用,带有forEach的ParallelStream是一个致命的组合。请看以下几点以避免任何错误:
  1. 如果您有一个预先存在的列表对象,要从parallelStream循环中添加更多对象,请使用Collections.synchronizedList&在循环通过并行流之前,将预先存在的列表对象传递给它。

  2. 如果必须创建一个新列表,则可以使用Vector在循环外初始化列表。或

  3. 如果您必须创建一个新的列表,那么只需使用parallelStream并在最后收集输出即可。

当您尝试进行突变时,您将失去使用流(和并行流(的好处。一般来说,使用流时应避免突变。Venkat Subramaniam解释了原因。相反,请使用收集器。还要努力在流链中完成很多工作。例如:

System.out.println(
IntStream.range(0, 200000)
.filter(i -> i % 2 == 0)
.mapToObj(String::valueOf)
.collect(Collectors.toList()).size()
);

您可以通过添加.parallel((在parallelStream中运行它

相关内容

最新更新