为多个并行流保持一个计数器更新



试图维护一个(同步的(计数器来跟踪List.parallelStream。 这是代码:

/**
* Testing the use of AtomicInteger in List.parallelStream()
*/
public void UsingParallelStream() {
/*Create the counter */
AtomicInteger atomicInteger = new AtomicInteger(0);
/*Create and populate the list */
List<Integer> numList = new ArrayList<Integer>();
int num = 1;
for (int i = 0; i < 1000; i++) {
numList.add(num);
num ++;
}
/*Test the consistency of the counter*/
numList.parallelStream().forEach(number -> {
atomicInteger.getAndIncrement();
executeTest(atomicInteger,numList);
});
}
private void executeTest(AtomicInteger atomicInteger, List<Integer> l) {
if (atomicInteger.intValue() % 100 == 0) {
System.out.println(l.get(atomicInteger.intValue() - 1));
}
}

以下是结果(如您所见,计数器未同步(: 100 200 300 300 401 501 501 701 800 901 800 1000

谢谢和布格斯, 大卫。

executeTest(atomicInteger,numList);中的操作不是原子操作。

if (atomicInteger.intValue() % 100 == 0) {
System.out.println(l.get(atomicInteger.intValue() - 1));
}

当一个线程发现atomicInteger.intValue() % 100 == 0为真,并尝试执行System.out.println(l.get(atomicInteger.intValue() - 1));时,其他线程可能已经更新了该值。

如何解决?使用局部变量:

numList.parallelStream().forEach(number -> {
int local = atomicInteger.getAndIncrement();
executeTest(local,numList);
});

private void executeTest(int  local, List<Integer> l) {
if (local % 100 == 0) {
System.out.println(l.get(local - 1));
}
}

您需要确保每个线程在不同的值上运行。此值只能找到一次,并且只能通过返回值getAndIncrement找到。对原子整数的每个进一步读取操作都可能返回一个新值,该值属于另一个线程。

numList.parallelStream().forEach(number -> {
var value = atomicInteger.getAndIncrement();
executeTest(value, numList);
});
private void executeTest(int value, List<Integer> l) {
if (value % 100 == 0) {
System.out.println(l.get(value - 1));
}
}

先递增,然后再读取时,多个线程可能使用相同的值。大气意味着,每个操作都是原子的。这并不意味着多个操作在同一线程中是原子的。

相关内容

最新更新