阻塞队列和多线程消费者,如何知道何时停止



我有一个单线程生产者,它创建一些任务对象,然后将其添加到ArrayBlockingQueue(固定大小(中。

我还启动了一个多线程使用者。这是构建为固定线程池(Executors.newFixedThreadPool(threadCount);(。然后,我将一些 ConsumerWorker 意图提交到这个 threadPool,每个 ConsumerWorker 都有一个对上述 ArrayBlockingQueue 实例的引用。

每个这样的工作线程将在队列上执行take()并处理任务。

我的问题是,让工人知道何时没有更多工作要做的最佳方法是什么。换句话说,我如何告诉工作线程生产者已完成添加到队列,从这一点开始,每个工作线程在看到队列为空时都应该停止。

我现在得到的是一个设置,我的生产者使用回调初始化,当他完成工作(将东西添加到队列(时触发。我还保留了我创建并提交给 ThreadPool 的所有 ConsumerWorkers 的列表。当生产者回调告诉我生产者已完成时,我可以告诉每个工作人员。此时,他们应该继续检查队列是否不为空,当队列变为空时,他们应该停止,从而允许我优雅地关闭 ExecutorService 线程池。是这样的

public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}
@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

这里的问题是我有一个明显的竞争条件,有时生产者会完成,发出信号,而 ConsumerWorkers 会在消耗队列中的所有内容之前停止。

我的问题是同步它以使一切正常的最佳方法是什么?我是否应该同步检查生产者是否正在运行的整个部分以及队列是否为空,并在一个块(在队列对象上(中从队列中获取一些东西?我应该只同步 ConsumerWorker 实例上isRunning布尔值的更新吗?还有其他建议吗?

更新,这是我最终使用的工作实现:

public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1); 
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}
@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

这在很大程度上受到JohnVint下面回答的启发,只有一些小的修改。

=== 由于@vendhan的评论而更新。

感谢您的观察。你是对的,这个问题中的第一个代码片段(除其他问题外(具有while(isRunning || !inputQueue.isEmpty())没有真正意义的代码片段。

在我实际的最终实现中,我做了一些更接近您替换"||"的建议(或(与"&&"(和(,从某种意义上说,每个工人(消费者(现在只检查他从列表中得到的元素是否是毒丸,如果是,则停止(所以理论上我们可以说工人必须正在运行并且队列不能为空(。

您应该继续从队列中take()。 你可以用毒丸告诉工人停下来。 例如:

private final Object POISON_PILL = new Object();
@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}

我会向工人发送一个特殊的工作包,以指示他们应该关闭:

public class ConsumerWorker implements Runnable{
private static final Produced DONE = new Produced();
private BlockingQueue<Produced> inputQueue;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}
@Override
public void run() {
    for (;;) {
        try {
            Produced item = inputQueue.take();
            if (item == DONE) {
                inputQueue.add(item); // keep in the queue so all workers stop
                break;
            }
            // process `item`
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}

要停止工作线程,只需将ConsumerWorker.DONE添加到队列中即可。

在尝试从队列中检索元素的代码块中,请使用 poll(time,unit) 而不是 take()

try { 
    Object queueElement = inputQueue.poll(timeout,unit);
     //process queueElement        
 } catch (InterruptedException e) {
        if(!isRunning && queue.isEmpty())
         return ; 
 } 

通过指定适当的超时值,您可以确保线程不会在不幸的序列

  1. isRunning是真的
  2. 队列变为空,因此线程进入阻塞等待(如果使用take()
  3. isRunning设置为 false

我们不能使用 CountDownLatch 来做到这一点,其中大小是生产者中的记录数。每个消费者都会在处理记录后countDown。当所有任务完成时,它跨越了awaits()方法。然后停止所有消费者。由于所有记录都已处理。

您可以使用许多策略,但一个简单的策略是拥有一个指示作业结束的任务子类。生产者不直接发送此信号。相反,它会将此任务子类的实例排队。当您的某个使用者完成此任务并执行它时,这会导致发送信号。

我不得不使用多线程生产者和多线程消费者。我最终得到了一个Scheduler -- N Producers -- M Consumers方案,每个两个都通过一个队列进行通信(总共两个队列(。调度程序用生成数据的请求填充第一个队列,然后用 N 个"毒丸"填充它。有一个活跃生产者计数器(原子int(,最后一个收到最后一个毒丸的生产者将M毒丸发送到消费者队列。

最新更新