如何创建生产者/使用者线程架构?



我有多个workerThreads和一个生产者Thread。当我运行下面的代码时,工作线程很少运行。我收到这样的输入;

workToDo Size: 1
workToDo Size: 2
...
workToDo Size: 514
91 removed  //First remove output

工作待办事项列表正在增加。有没有办法减少工作线程的运行间隔?我想我在这里做错了什么。

我的主要方法;

WorkerThread workerThread = new WorkerThread();
WorkerThread workerThread2 = new WorkerThread();
WorkerThread workerThread3 = new WorkerThread();
ProducerThread producerThread = new ProducerThread();
workerThread.producerThread = producerThread;
workerThread2.producerThread = producerThread;
workerThread3.producerThread = producerThread;
producerThread.start();
workerThread.start();
workerThread2.start();
workerThread3.start();

工作线程;

public class WorkerThread extends Thread {
ProducerThread producerThread;
@Override
public void run() {
while (true) {
synchronized (producerThread) {
try {
producerThread.wait();
System.out.println(producerThread.workToDo.remove(0) + " removed");
} catch (InterruptedException ex) {
}
}
}
}
}

生产者线程;

public class ProducerThread extends Thread {
List<Integer> workToDo = new ArrayList();
@Override
public void run() {
while (true) {
synchronized (this) {
workToDo.add((int)(Math.random() * 100));
System.out.println("workToDo Size: " + workToDo.size());
notifyAll();
}
}
}
}

工作待办事项列表正在增加。有没有办法减少工作线程的运行间隔?我想我在这里做错了什么。

如果没有看到更多的代码,很难完全回答,但当你的生产者线程可以比消费者处理它们更快地产生工作时,这似乎是一个典型的问题。 您应该使用有界BlockingQueue以便只对一定数量的请求进行排队,而不会用它们填满内存。

应考虑将ExecutorService类与有界队列一起使用。 例如:

// start a thread pool with 3 worker threads and a queue of 100
ExecutorService threadPool =
new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(100));
// to get the producer to _block_ instead of rejecting the job you need a handler
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// this will cause the producer to block until there is room in queue
executor.getQueue().add(r);
}
});

如果将线程计数增加到 4,则生产者线程也可能是在 threadPool 中运行的作业,然后只需将Runnable个作业提交到线程池,由 3 个工作线程处理。 如果 3 个繁忙,那么它将在ArrayBlockingQueue中对请求进行排队。 一旦 100 个(随意更改该数字)作业排队,生产者将阻止,直到另一个作业完成。

使用ExecutorServiceBlockingQueue意味着所有难以正确处理的等待和通知逻辑都会为您处理。

最新更新