如何实现多生产者多消费者单缓冲模型



有一个缓冲区、多个生产者和多个消费者。任何生产者(只有一个(都可以在缓冲区为空时写入缓冲区。任何使用者(只有一个(都可以在缓冲区已满时读取缓冲区。如何实现这样的模型?我尝试使用pthread_cond_wait和pthread-cond_signal。但我没有找到解决方案。

下面的多生产者多消费者单缓冲区实现是使用Ada实现的。受保护的对象是缓冲区。有两个生产者和两个消费者。受保护的对象隐式地处理所有的锁定变量和条件变量。

生产者任务类型、使用者任务类型和受保护对象都在Ada包中定义。

包装规格为:

-----------------------------------------------------------------------
-- Producer-consumer with bounded buffer
-----------------------------------------------------------------------
generic
Capacity : Positive;
package Bounded_PC is
task type Producer is
entry set_id(Id : in Positive);
entry Stop;
end Producer;
task type Consumer is
entry set_id(Id : in Positive);
entry Stop;
end Consumer;
end Bounded_PC;

包主体包含任务类型和共享缓冲区的实现。

with Ada.Text_IO; use Ada.Text_IO;
package body Bounded_PC is
subtype Index_T is Positive range 1..Capacity;
type Buf_Array is array (Index_T) of Integer;
------------
-- Buffer --
------------
protected Buffer is
Entry Write(Item : in Integer);
Entry Read(Item : out Integer);
private
Buf : Buf_Array;
Write_Index : Index_T := 1;
Read_Index  : Index_T := 1;
Count       : Natural := 0;
end Buffer;

protected body Buffer is
entry Write(Item : in Integer) when Count < Capacity is
begin
Buf(Write_Index) := Item;
Write_Index := (Write_Index mod Capacity) + 1;
Count := Count + 1;
end Write;
entry Read(Item : out Integer) when Count > 0 is
begin
Item := Buf(Read_Index);
Read_Index := (Read_Index mod Capacity) + 1;
Count := Count - 1;
end Read;
end Buffer;
--------------
-- Producer --
--------------
task body Producer is
Value : Integer := 0;
Me : Positive;
begin
accept Set_Id(Id : in Positive) do
Me := Id;
end Set_Id;
loop
select
accept Stop;
exit;
else
select
Buffer.Write(Value);
Put_Line("Producer" & Me'Image & " wrote" & Value'Image);
Value := Value + 1;
or
delay 0.001;
end select;
end select;
end loop;
end Producer;
--------------
-- Consumer --
--------------
task body Consumer is
Value : Integer;
Me : Positive;
begin
accept Set_Id(Id : in Positive) do
Me := Id;
end Set_Id;
loop
select
accept Stop;
exit;
else
select
Buffer.Read(Value);
Put_Line("Consumer" & Me'Image & " read" & Value'Image);
or
delay 0.001;
end select;
end select;
end loop;
end Consumer;
end Bounded_PC;

每个入口调用的条件都在受保护的主体中声明。

entry Write(Item : in Integer) when Count < Capacity
entry Read(Item : out Integer) when Count > 0

该程序的主要程序是:

with Bounded_PC;
procedure Main is
package Int_Pck is new Bounded_Pc(10);
use Int_Pck;
P1 : Producer;
P2 : Producer;
C1 : Consumer;
C2 : Consumer;
begin
P1.Set_Id(1);
P2.Set_Id(2);
C1.Set_Id(1);
C2.Set_Id(2);
delay 0.01;
P1.Stop;
P2.Stop;
delay 0.01;
C1.Stop;
C2.Stop;
end Main;

我找到了一个解决方案:

bool buffer_empty = true;
// consumer thread:
pthread_mutex_lock(&buffer_lock);
while (buffer_empty) {
pthread_cond_wait(&buffer_full, &buffer_lock);
}
// consume
buffer_empty = true;
pthread_mutex_unlock(&buffer_lock);

// producer thread:
pthread_mutex_lock(&buffer_lock);
while (!buffer_empty) {
pthread_cond_wait(&buffer_full, &buffer_lock);
}
// produce
buffer_empty = false;
pthread_mutex_unlock(&buffer_lock);
pthread_cond_signal(&buffer_full);

最新更新