生产者/消费者:任何生产者/消费者都不应阻止其他生产者/消费者



我正在设计一个系统,其中将有n生产者和m消费者,其中n和m是数字,n != m。

我想以这样的方式设计系统,

  1. 任何生产者在生产时都不应阻止其他生产者
  2. 任何消费者在消费时都不应阻止其他消费者
  3. 生产者和消费者在生产/消费时相互阻隔

例如:在java中,如果我使用同步关键字,那么它将阻止相应的调用者。

不确定我应该使用什么数据结构和算法来实现这个系统。

有人可以为此提供帮助/指示吗?

你可能想要类似ConcurrentLinkedQueue的东西。这个想法是你创建一个队列。n 个生成者中的每个生产者都将工作项添加到队列中,m 个使用者中的每个使用者从队列中读取工作项。制片人很简单:

while not done
    create work item
    add work item to queue

消费者同样简单:

while not done
    get next work item from queue
    process work item

ConcurrentLinkedQueue 方法处理添加和删除项,并根据需要与其他生成者和使用者同步。

唯一真正的缺点是您必须轮询队列以查看是否有项目。因此,您可能需要一个自动重置事件,每当将项目添加到队列时都会被触发。例如:

add work item to queue
set ItemAvailable event

使用者将轮询队列,如果没有可用项目,则等待事件:

while not done
    while ((item = queue.poll) == null)
        wait on ItemAvailable event
    process item

看看我链接的例子。使用起来真的不难。

根据你需要做多少繁重的工作,以及你的解决方案需要多好扩展,RxJava有一个陡峭的学习曲线,但一旦你克服了这一点,它可能是最优雅,扩展和性能的解决方案。

在不同的线程中运行所有生产者,将它们与Merg()组合,将消费者移动到带有.observeOn(Scheduler.newThread())的未绑定缓冲区上自己的线程。


如果你需要一些在多个系统上并行运行良好的东西,看看mapreduce。

如果你需要完全另一端的东西(简单的东西),只需坚持使用ConcurrentQueue。这不支持多播,但至少解决了生产者方面的问题。

你想要一种方法,其中每个操作都是原子的和不间断的,所以是的,在我看来,最好的方法是在设置锁的方法上使用同步修饰符。

另一个有趣的方法是使用原子变量 -> http://baptiste-wicht.com/posts/2010/09/java-concurrency-atomic-variables.html

这取决于您在这些生产者/消费者结构中的数据。

使用 wait() 和

notify() 进行线程通信,可以创建n生产者和m消费线程

class Q{
 int n;
 boolean value=false;
 synchronized int get() {
 if(!value)
 try  {  wait();   }
 catch(InterruptedException e)
 { System.out.println("thread interrupted"); }
 System.out.println("Got : "+n);
 value=false;
 notify();
 return n;}
 synchronized void put(int n) {
 if(value)
 try { wait();}
 catch(InterruptedException e)
 {  System.out.println("thread interrupted"); }
 this.n=n;
 value=true;
 System.out.println("Put : "+n);
 notify();}}
 class Producer implements Runnable{
 Q q;
 Producer(Q q){
 this.q=q;
  new Thread(this,"Producer").start();}
 public void run(){
 int i=0;
 while(true)
 {
 q.put(i++);}}
 }
 class Consumer implements Runnable{
 Q q;
  Consumer(Q q) {
  this.q=q;
  new Thread(this,"Consumer").start();}
  public void run(){
   while(true)
  {
  q.get();
   }}}
   class PCFixed
  {
   public static void main(String ar[])
  {
   Q q=new Q();
   new Producer(q);
   new Consumer(q);
   System.out.println("PRESS CONTROL-C TO STOP");
   }
   }

它进入无穷大,根据您的要求进行更改

最新更新