C++多线程生产者-消费者问题



我用条件变量编写了一段将生产者和消费者相乘的代码。 即使我只有一个生产者和一个消费者,它也不起作用。 生产者和消费者都应该运行 while(true(。 当我运行代码时,它卡在大约 50% 的运行中。 我想它因过度等待而陷入僵局。 我没有成功调试它卡在哪里以及如何解锁 conds。 根据要求,我必须创建带有等待,信号和广播的代码。

如果队列已满,则生产者正在等待。 如果队列为空,则使用者正在等待。

void WaitableQueue::enqueue(size_t a_item)
{
(m_cond.getMutex()).lock();
while(m_itemsCounter==m_capacity && !m_isBeingDestroyed)
{
++m_numberOfWaiting;
m_cond.wait();
--m_numberOfWaiting;
}
std::cout<<"enqueue "<<a_item<<"n";
m_queue.push(a_item);
++m_itemsCounter;
++m_numbOfProduced;
if(m_isBeingDestroyed)
{
m_cond.broadcast(); 
}
(m_cond.getMutex()).unlock();
m_cond.broadcast();
}
void WaitableQueue::dequeue()
{
(m_cond.getMutex()).lock();
while(m_itemsCounter==0 && !m_isBeingDestroyed)
{
++m_numberOfWaiting;
std::cout<<"Waitingn";
m_cond.wait();
std::cout<<"Done waitingn";
--m_numberOfWaiting;
}
if (m_isBeingDestroyed)
{
(m_cond.getMutex()).unlock();
m_cond.broadcast();
return;
}
std::cout<<"dequeue "<<m_queue.front()<<"n";
m_queue.pop();
--m_itemsCounter;
++m_numbOfConsumed;
(m_cond.getMutex()).unlock();
m_cond.broadcast();
}
void WaitableQueue::destroy()
{
(m_cond.getMutex()).lock();
m_isBeingDestroyed=true;
(m_cond.getMutex()).unlock();
}

void Producer::run()
{
for(size_t i=0;i<m_numOfItemsToProduce;++i)
{
usleep(m_delay);
size_t item=produce();
m_wq.enqueue(item);
}
}

Producer::produce() const
{
return rand()%m_numOfItemsToProduce;
}
void Consumer::run()
{
m_numOfProducersMutex.lock();
while(m_numOfProducers>0)
{
m_numOfProducersMutex.unlock();
usleep(m_delay);
m_wq.dequeue();
m_numOfProducersMutex.lock();
}
m_numOfProducersMutex.unlock();
}

int main()
{
size_t numProducers=1;
size_t numConsumers=3;
Mutex mutex;
ConditionalVariable cond(mutex);
WaitableQueue<size_t> wq(NUM_OF_ITEMS,cond);
std::vector<Producer<size_t>*> producerArray;
std::vector<Consumer<size_t>*> consumerArray;
Mutex numOfProducersMutex;
for(size_t i=0;i<numProducers;++i)
{
Producer<size_t>* tempP=new Producer<size_t>(wq,NUM_OF_ITEMS,DELAY);
producerArray.push_back(tempP);
}
for(size_t i=0;i<numConsumers;++i)
{
Consumer<size_t>* tempC=new Consumer<size_t>(wq,numProducers,numOfProducersMutex,DELAY);
consumerArray.push_back(tempC);
}
for(size_t i=0;i<numProducers;++i)
{
producerArray[i]->start();
}
for(size_t i=0;i<numConsumers;++i)
{
consumerArray[i]->start();
}
for(size_t i=0;i<numProducers;++i)
{
producerArray[i]->join();
numOfProducersMutex.lock();
--numProducers;
numOfProducersMutex.unlock();
}
usleep(100);
//tell the consumers stop waiting
wq.destroy();
for(size_t i=0;i<numConsumers;++i)
{
consumerArray[i]->join();
}
for(size_t i=0;i<numProducers;++i)
{
delete producerArray[i];
}
for(size_t i=0;i<numConsumers;++i)
{
delete consumerArray[i];
}
}

它适用于大约 50% 的跑步。 在其他 50% 中,它被卡住了。

要使用条件变量解决生产者-消费者问题,首先需要了解有界缓冲区问题。

在这里检查使用 C++ 中的条件变量实现线程安全缓冲区队列: https://codeistry.wordpress.com/2018/03/08/buffer-queue-handling-in-multithreaded-environment/

您可以使用此缓冲区队列作为构建块来解决多生产者使用者问题。 请在此处查看如何使用线程安全缓冲队列来解决C++中的生产者-消费者问题: https://codeistry.wordpress.com/2018/03/09/unordered-producer-consumer/

> 你已经发现了另一个例子,说明C++如何从一个概念上简单的问题中提出一个难题。

您似乎希望一个或多个生产者生成相同数量的值,并让一组使用者读取和处理这些值。您似乎还希望生产者的数量等于使用者的数量,同时允许该数量(生产者和使用者(可配置。

使用 Ada 时,这个问题非常简单,它在设计时考虑了并发性。

第一个文件是定义生产者和消费者任务类型的 Ada 包规范。

generic
Items_To_Handle : Positive;
package Integer_Prod_Con is
task type Producer;
task type Consumer;
end Integer_Prod_Con;

泛型参数很像模板参数。在这种情况下,作为泛型参数传递的值必须是正整数。 该软件包的实现如下。

with Ada.Containers.Synchronized_Queue_Interfaces;
with Ada.Containers.Unbounded_Synchronized_Queues;
with Ada.Text_Io; use Ada.Text_IO;
package body Integer_Prod_Con is
package Int_Interface is new Ada.Containers.Synchronized_Queue_Interfaces(Integer);
package Int_Queue is new Ada.Containers.Unbounded_Synchronized_Queues(Queue_Interfaces =>Int_Interface);
use Int_Queue;
The_Queue : Queue;
--------------
-- Producer --
--------------
task body Producer is
begin
for Num in 1..Items_To_Handle loop
The_Queue.Enqueue(Num);
delay 0.010;
end loop;
end Producer;
--------------
-- Consumer --
--------------
task body Consumer is
Value : Integer;
begin
for Num in 1..Items_To_Handle loop
The_Queue.Dequeue(Value);
Put_Line(Value'Image);
delay 0.010;
end loop;
end Consumer;
end Integer_Prod_Con;

该包采用预定义的通用包,实现无限队列作为缓冲区。这允许队列根据程序的需要增长和缩小。每个生产者任务将 1 到 Items_To_Handle 的整数值排队,每个使用者从队列中取消排队并输出相同数量的元素。

该程序的主要过程是:

with Integer_Prod_Con;
procedure Int_Queue_Main is
PC_Count : constant := 3;
package short_list is new Integer_Prod_Con(10);
use short_List;
Producers : Array(1..PC_Count) of Producer;
Consumers : Array(1..PC_Count) of Consumer;
begin
null;
end Int_Queue_Main;

该程序的输出是:

1
1
1
2
2
2
3
3
3
4
4
4
5
5
5
6
6
6
7
7
7
8
8
8
9
9
9
10
10
10

最新更新