并行流 - 多少是并行?



我有一个并行流,如下面的Runner.java所示。但我意识到 Runner 类中的聚合方法是按顺序运行的。我认为这是因为我forEach,循环无法打破,尽管我不确定。我对这个顺序的分析是基于我在下面类的 getter 方法中打印的线程名称。

尽管下面的代码包含 1000 个对象的列表,但为了能够在此处粘贴较小的日志,我将列表更改为 10 个项目以打印线程名称。日志在下面Runner.java之后。查看它们,聚合方法始终由主线程执行,而不是由CommonPool执行。对于 1000 个项目,也观察到相同的行为。如果 forEach 是导致顺序执行的原因,有人可以帮助我吗?

员工.java

public class Employee {
private double salary;
private String fName;
private String lName;
public Employee(double salary, String fName, String lName) {
this.salary = salary;
this.fName = fName;
this.lName = lName;
} 
public Employee() {
}
public double getSalary() {
return salary;
}
public void setSalary(double salary) {
this.salary = salary;
}
public String getfName() {
System.out.println("Thread.currentThread() + "for getFname" = " + Thread.currentThread() + 
" for getFname");
return fName;
}
public void setfName(String fName) {
this.fName = fName;
}
public String getlName() {
return lName;
}
public void setlName(String lName) {
this.lName = lName;
}
@Override
public String toString() {
return "E{" +
"s=" + salary +
", N='" + fName + ''' +
'}';
}
}

流道类:

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class Runner {
public static void main(String[] args) {
ArrayList<Employee> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add(new Employee(i * 100, "A" + i, "Z" + i));
}
long l = System.currentTimeMillis();
List<Employee> collect = list
.parallelStream()
.collect(Collectors.collectingAndThen(Collectors.groupingBy(Employee::getfName), Runner::aggregate));
System.out.println("collect = " + collect.size());
System.out.println(collect.stream().mapToDouble(e -> e.getSalary()).sum());
}
private static List<Employee> aggregate(Map<String, List<Employee>> t) {
ArrayList<Employee> emps = new ArrayList<>();
t.entrySet()
.forEach(e -> {
System.out.println("Thread.currentThread() + "within agrgate" = " + Thread.currentThread() + "within agrgate");
Employee e1 = new Employee();
e.getValue().forEach(r -> {
e1.setSalary(e1.getSalary() + r.getSalary());
e1.setfName(r.getfName());
});
emps.add(e1);
});
return emps;
}
}

原木:

Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-1,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-3,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-2,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-3,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-1,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname

for each 方法来自接口可迭代,其中"操作按迭代顺序执行......">

然后,您可以为每个使用流。

t.entrySet().parallelStream().forEach( e->{ 

最新更新