有两个测试用例使用parallelStream()
:
List<Integer> src = new ArrayList<>();
for (int i = 0; i < 20000; i++) {
src.add(i);
}
List<String> strings = new ArrayList<>();
src.parallelStream().filter(integer -> (integer % 2) == 0).forEach(integer -> strings.add(integer + ""));
System.out.println("=size=>" + strings.size());
=size=>9332
List<Integer> src = new ArrayList<>();
for (int i = 0; i < 20000; i++) {
src.add(i);
}
List<String> strings = new ArrayList<>();
src.parallelStream().forEach(integer -> strings.add(integer + ""));
System.out.println("=size=>" + strings.size());
=size=>17908
为什么在使用parallelStream时总是丢失数据?我做错了什么?
ArrayList
不是线程安全的。你需要做
List<String> strings = Collections.synchronizedList(new ArrayList<>());
或
List<String> strings = new Vector<>();
以确保所有更新同步,或切换到
List<String> strings = src.parallelStream()
.filter(integer -> (integer % 2) == 0)
.map(integer -> integer + "")
.collect(Collectors.toList());
并将列表构建留给Streams框架。请注意,collect
返回的列表是否可修改是未定义的,因此如果这是一个要求,您可能需要修改您的方法。
就性能而言,Stream.collect可能比使用Stream.forEach
添加到同步集合快得多,因为Streams框架可以在不同步的情况下单独处理每个线程中的值集合,并在最后以线程安全的方式组合结果。
ArrayList
不是线程安全的。当一个线程看到具有30个元素的列表时,另一个线程可能仍然看到29并覆盖第30个位置(丢失1个元素(。
当支持列表的数组需要调整大小时,可能会出现另一个问题。创建一个新的数组(大小是原来的两倍(,并将原始数组中的元素复制到其中。虽然其他线程可能已经添加了内容,但进行调整大小的线程可能没有看到这一点或多个线程正在调整大小,最终只有1个线程获胜。
当使用多个线程时,您需要在访问列表时进行一些同步,或者使用多线程安全列表(通过将其封装在SynchronizedList
中或使用CopyOnWriteArrayList
来提及2种可能的解决方案(。更好的方法是对流使用collect
方法将所有内容放入列表中。
如果您有一个预先存在的列表对象,要从parallelStream循环中添加更多对象,请使用Collections.synchronizedList&在循环通过并行流之前,将预先存在的列表对象传递给它。
如果必须创建一个新列表,则可以使用Vector在循环外初始化列表。或
如果您必须创建一个新的列表,那么只需使用parallelStream并在最后收集输出即可。
当您尝试进行突变时,您将失去使用流(和并行流(的好处。一般来说,使用流时应避免突变。Venkat Subramaniam解释了原因。相反,请使用收集器。还要努力在流链中完成很多工作。例如:
System.out.println(
IntStream.range(0, 200000)
.filter(i -> i % 2 == 0)
.mapToObj(String::valueOf)
.collect(Collectors.toList()).size()
);
您可以通过添加.parallel((在parallelStream中运行它