向流添加并行会导致空指针异常



我正在尝试了解Java流。我的理解是,它们提供了一种简单的方法来并行化行为,而且并非所有操作都受益于并行化,但您始终可以选择通过将 .parallell(( 拍打到现有流上来做到这一点。在某些情况下,这可能会使流变慢,或者在末尾以不同的顺序返回元素等,但您始终可以选择并行化流。这就是为什么当我改变这种方法时我感到困惑的原因:

public static List<Integer> primeSequence() {
List<Integer> list = new LinkedList<Integer>();
IntStream.range(1, 10)
.filter(x -> isPrime(x))
.forEach(list::add);
return list;
}
//returns {2,3,5,7}

对此:

public static List<Integer> primeSequence() {
List<Integer> list = new LinkedList<Integer>();
IntStream.range(1, 10).parallel()
.filter(x -> isPrime(x))
.forEach(list::add);
return list;
}
//throws NullPointerException();

我认为除非另有说明,否则所有流都是串行的,并且 parallel(( 只是制作然后并行执行。我在这里错过了什么?为什么它会引发异常?

初始primeSequence方法实现存在一个重大问题 - 将流迭代与外部列表修改混合在一起。您应该避免以这种方式使用流,否则您将面临很多问题。就像你描述的那个。如果你看一下add(E element)方法是如何实现的,你会看到这样的东西:

public boolean add(E e) {
this.linkLast(e);
return true;
}
void linkLast(E e) {
LinkedList.Node<E> l = this.last;
LinkedList.Node<E> newNode = new LinkedList.Node(l, e, (LinkedList.Node)null);
this.last = newNode;
if (l == null) {
this.first = newNode;
} else {
l.next = newNode;
}
++this.size;
++this.modCount;
}

如果在示例中使用CopyOnWriteArrayList而不是LinkedList,则不会引发NullPointerException- 只是因为CopyOnWriteArrayList使用锁定进行多线程执行同步:

public boolean add(E e) {
ReentrantLock lock = this.lock;
lock.lock();
boolean var6;
try {
Object[] elements = this.getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
this.setArray(newElements);
var6 = true;
} finally {
lock.unlock();
}
return var6;
}

但这仍然不是利用并行流的最佳方式。

使用流 API 的正确方法

请考虑对代码进行以下修改:

public static List<Integer> primeSequence() {
return IntStream.range(1, 10)
.parallel()
.filter(x -> isPrime(x))
.boxed()
.collect(Collectors.toList());
}

我们不是修改一些外部列表(任何类型的(,而是收集结果并返回最终列表。您可以使用.stream()方法将任何列表转换为流,并且不必担心初始列表 - 您将应用于该列表的所有操作都不会修改输入,结果将是输入列表的副本。

我希望它有所帮助。

最新更新