Java生产者-消费者-停止消费者线程



我有这段代码,我想要一个好的方法来停止使用者线程:

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
public class Foo {
private final Queue<Object> queue;
private final AtomicBoolean doneReading;
private final int numberOfThreads = 4, N = 100;
public Foo() {
queue = new ArrayDeque<>();
doneReading = new AtomicBoolean(false);
}
public void execute() throws InterruptedException {
Thread[] threads = new Thread[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++) {
threads[i] = new Thread(() -> {
try {
synchronized (queue) {
while (!doneReading.get() || !queue.isEmpty()) {
if (queue.isEmpty()) {
queue.wait();
if (!queue.isEmpty()) {
Object element = queue.remove();
// Do stuff
}
}
else {
Object element = queue.remove();
// Do stuff
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
});
threads[i].start();
}
for (int i = 0; i < N; i++) {
synchronized (queue) {
queue.add(new Object());
queue.notifyAll();
}
}
doneReading.set(true);
synchronized (queue) {
queue.notifyAll();
}
for (Thread thread : threads) {
thread.join();
}
}
}

基本上,当我读取了所有需要处理的数据时,我希望使用者线程停止。我尝试了while(!doneReading.get(((,但这并不能保证队列中没有任何剩余项。我补充道!queue.isEmpty((,但在这种情况下,一些线程会继续等待,即使它们不会收到任何通知。所以我设法再次调用notifyAll((。这似乎确实有效。我还想过在队列中添加一个null,每当消费者读取到null时,它就会退出。哪种方法更好,或者有更好的想法吗?

一种常见的方法是;毒丸";。在队列中放入一个特殊值,该值在读取时会杀死使用者线程。这使他们能够处理所有的值,并且在读取超过最终值并读取毒丸之前不会停止。

更多信息:https://java-design-patterns.com/patterns/poison-pill/

我也喜欢这些网站,它们经常有关于Java编程的深思熟虑的信息:

https://mkyong.com/java/java-blockingqueue-examples/

https://www.baeldung.com/java-blocking-queue

class LimitedQueue<T> {
ArrayDeque<T>  queue = new ArrayDeque<>();
boolean done = false;
synchronized void add (T item) {
queue.add(item);
notifyAll();
}
synchronized void done() 
done=true;
notifyAll();
}
// most complex method
// waits until next item or done signal is put
synchronized boolean isDone() {
for (;;) {
if (!queue.isEmpty(){
return false;
}
if (done) {
return true;
}
wait();
}
}
syncronized T remove() {
return deque.remove();
}
}
LimitedQueue<Object> queue = new LimitedQueue<>();
class ConsumerThread extends Thread {

public void run(){
while (!queue.isDone()) {
Object element = queue.remove();
// do stuff
}
}
}
class ProducerThread extends Thread {
public void run() {
for (int i = 0; i < N; i++) ,{
queue.add(new Object());
}
queue.done();
}
}

最新更新