我正在阅读一些关于Java中生产者消费者实现的内容。 大多数示例的在线代码都显示了具有以下基本对象的解决方案:
用作数据管道的阻塞队列
n 个将消息推送到队列的生产者数量
n 个正在轮询队列中数据的使用者
通常,使用者被提交到一个线程池,该线程池完成实现。
我调查的另一个选项,几乎没有在线表示,只是将任务作为可运行对象提交到 FixedThreadPool。
在此实现中,用于处理消息的业务逻辑在 Runnable 对象的run()
方法中实现。当我们有一条新消息想要处理时,我们可以简单地将这种类型的新任务提交给FixedThreadPool
,然后......就是这样。正在运行的线程数由FixedThreadPool
实现以及轮询消息的逻辑管理,留给我们实现的只是用例的业务逻辑。
谁能解释为什么这个解决方案被忽视了?
当 java 语言已经为我们实现时,有什么具体原因我们需要使用阻塞队列和轮询吗?
public class ProducerConsumerExample{
private ExecutorService pool;
public ProducerConsumerExample(int numberOfThreads){
this.pool = Executors.newFixedThreadPool(numberOfThreads);
}
public void submit(MessageObject msg){
pool.submit(new MessagePrinter(msg));
}
}
public class MessagePrinter implements Runnable{
private MessageObject msg;
public MessagePrinter(MessageObject msg){
this.msg = msg;
}
@Override
public void run() {
//only need to implement logic that is releavent for our use case
System.out.println("Message recieved " + msg.toString());
}
}
public static void main(String[] args){
ProducerConsumerExample ex = new ProducerConsumerExample(5);
for(int i=0;i<WHATEVER;i++){
ex.submit(new MessageObject());
}
}
您的方法没有错,只是不清楚Executors.newFixedThreadPool().
创建的池中使用了什么队列 你确定如果任务数量变得相当可观,你就不能RejectedExecutionException
吗?
因此,最好使用具有预定义大小的任务队列的新显式创建。 查看Executors.newFixedThreadPool(int)
的源代码。
实际上,最好在生产者-消费者应用程序中使用线程池。从 Java 并发实践:
Executor
基于生产者-消费者模式,其中活动 提交任务的是生产者(生产工作单元 完成),执行任务的线程是使用者(消费 这些工作单元)。使用遗嘱执行器通常是最简单的途径 在应用程序中实现生产者-消费者设计。
选择实现时应考虑的事项:
- 耦合:生产者和消费者对彼此以及将他们连接在一起的渠道了解多少?
- 底层队列实现:哪种类型的
Queue
最适合这项工作? - 生命周期:如果
ThreadPool
损坏或关闭会发生什么?
线程 - 管理和饥饿:您在管理线程方面参与多少?长寿命与短寿命对象?