我正在设计一个系统,其中将有n
生产者和m
消费者,其中n和m是数字,n != m。
我想以这样的方式设计系统,
- 任何生产者在生产时都不应阻止其他生产者
- 任何消费者在消费时都不应阻止其他消费者
- 生产者和消费者在生产/消费时相互阻隔
例如:在java中,如果我使用同步关键字,那么它将阻止相应的调用者。
我不确定我应该使用什么数据结构和算法来实现这个系统。
有人可以为此提供帮助/指示吗?
你可能想要类似ConcurrentLinkedQueue的东西。这个想法是你创建一个队列。n 个生成者中的每个生产者都将工作项添加到队列中,m 个使用者中的每个使用者从队列中读取工作项。制片人很简单:
while not done
create work item
add work item to queue
消费者同样简单:
while not done
get next work item from queue
process work item
ConcurrentLinkedQueue
方法处理添加和删除项,并根据需要与其他生成者和使用者同步。
唯一真正的缺点是您必须轮询队列以查看是否有项目。因此,您可能需要一个自动重置事件,每当将项目添加到队列时都会被触发。例如:
add work item to queue
set ItemAvailable event
使用者将轮询队列,如果没有可用项目,则等待事件:
while not done
while ((item = queue.poll) == null)
wait on ItemAvailable event
process item
看看我链接的例子。使用起来真的不难。
根据你需要做多少繁重的工作,以及你的解决方案需要多好扩展,RxJava有一个陡峭的学习曲线,但一旦你克服了这一点,它可能是最优雅,扩展和性能的解决方案。
在不同的线程中运行所有生产者,将它们与Merg()
组合,将消费者移动到带有.observeOn(Scheduler.newThread())
的未绑定缓冲区上自己的线程。
如果你需要一些在多个系统上并行运行良好的东西,看看mapreduce。
如果你需要完全另一端的东西(简单的东西),只需坚持使用ConcurrentQueue。这不支持多播,但至少解决了生产者方面的问题。
你想要一种方法,其中每个操作都是原子的和不间断的,所以是的,在我看来,最好的方法是在设置锁的方法上使用同步修饰符。
另一个有趣的方法是使用原子变量 -> http://baptiste-wicht.com/posts/2010/09/java-concurrency-atomic-variables.html
这取决于您在这些生产者/消费者结构中的数据。
notify() 进行线程通信,可以创建n
生产者和m
消费线程
class Q{
int n;
boolean value=false;
synchronized int get() {
if(!value)
try { wait(); }
catch(InterruptedException e)
{ System.out.println("thread interrupted"); }
System.out.println("Got : "+n);
value=false;
notify();
return n;}
synchronized void put(int n) {
if(value)
try { wait();}
catch(InterruptedException e)
{ System.out.println("thread interrupted"); }
this.n=n;
value=true;
System.out.println("Put : "+n);
notify();}}
class Producer implements Runnable{
Q q;
Producer(Q q){
this.q=q;
new Thread(this,"Producer").start();}
public void run(){
int i=0;
while(true)
{
q.put(i++);}}
}
class Consumer implements Runnable{
Q q;
Consumer(Q q) {
this.q=q;
new Thread(this,"Consumer").start();}
public void run(){
while(true)
{
q.get();
}}}
class PCFixed
{
public static void main(String ar[])
{
Q q=new Q();
new Producer(q);
new Consumer(q);
System.out.println("PRESS CONTROL-C TO STOP");
}
}
它进入无穷大,根据您的要求进行更改