改进的生产者-消费者算法



我正在处理一个修改后的生产者/消费者问题。然而,有一个竞争条件,我正在讨论最好的方法来解决它。可能有更干净的方法,我想知道是否有人做过类似的事情,如果可能的话,分享一个更好的解决方案。

使用队列作为正常的生产者/消费者启动。一个生产者线程从磁盘读取项目,并在共享队列中排队。然后,多个消费者线程尝试将项目从队列中取出以进行处理。然而,每个项目都有一个标签(如线程Id),必须与消费者标签相匹配。消费者线程查看队列的前面并检查项目的标记。如果它与消费者线程的标签不匹配,消费者线程必须进入睡眠状态并等待,直到队列的前面有一个与它的标签匹配的项目。有点令人困惑,但下面的伪代码有望澄清算法:

struct item
{
   // This is unique tag that only a specific consumer can consumer
    int    consumerTag; 
    // data for the consumer to consume
    void   *pData;
}
///////////////////////////////
// PRODUCER THREAD -> only 1
///////////////////////////////
// producer reads items
// each item has a tag to a specific consumer
while (item = read())
{
    lock(queue)
    if (queueNotFull)
    {
        enqueue(item);
    }
    else
    {
       // check front of the queue, notify worker.
       Sleep(); // Releases Queue Mutex upon entering
       // requires the mutex after it has been awaken
    }
    unlock(queue);
    wakeUpAllConsumers();
}
-------------------------------------------------------
///////////////////////////////
// CONSUMER THREAD -> many threads
///////////////////////////////
// my tag is it like at thread id,
// each consumer has a unique tag
myTag = getThreadTAG()
while (true)
{
    lock (queue);
    if (queueNotEmpty)
    {
        item = queueFront()
        if (myTag == item->id)
        {
           // this item is for me, let's dequeue and process
           item = dequeue();
           process();
        }
        else
        {
           // This is not for me let's go to sleep
           Sleep(); // Releases Queue Mutex
          // re-acquire mutex
        }
    }
    else
    {
        Sleep();    // Releases Queue Mutex
       // re-acquire mutex
    }
    unlock (queue);
    wakeUpProducer();
}

然而,上面的算法存在问题。让我们考虑以下事件并假设如下:

项目。Tag =1意味着该物品只能由具有相同标签的消费者消费。我将它表示为消费者。标签= 1

  1. 生产者读取item.tag=1并排队
  2. 生产者唤醒所有消费者线程(consumer.tag=1, consumer.tag=2等)…我们现在都醒了,正在检查队列的前面)
  3. 生产者读取item.tag=2并排队
  4. 生产者唤醒所有消费者线程
  5. 队列现在有[item.tag=1, item.tag=2]
  6. consumer.tag=2 wakes up and peek at the front of the queue, but项。tag=1,不匹配consumer.tag=1;因此,它进入睡眠状态。consumer.tag=2正在睡觉。
  7. consumer.tag=1唤醒并偷看队列的前面,item.tag=1匹配consumer.tag=1。退出队列并通知生产者它可以消耗更多。
  8. 生产者完成数据读取并退出。现在队列有item.tag=2,而consumer.tag=2处于睡眠状态,永远不会消耗该数据。注意,可以有许多消费者。因此,最终许多消费者可能会进入睡眠状态,队列

我想在生产者线程的末尾添加一个循环,不断唤醒所有睡眠的线程,直到队列为空。

// PRODUCER THREAD
// Process the rest
while (queueIsNotEmpty)
{
     WakeUpAllConsumer();
     Sleep();
}

但是我相信一定有一个更优雅的方法来处理这个问题。有什么想法请告诉我

谢谢!

我之前遇到过类似的情况(在一个所有线程都可以处理所有项目的设置中),我使用的解决方案是在制作人完成读取数据时最后一次唤醒所有人,尽管不是那么优雅。
在这里,这实际上不起作用,因为如果队列中有第三个项目,那么该项目可能会被抛在后面。我的建议是以下两种方法之一:

  1. 唤醒所有的线程,每次消费者从队列中取出一个项目。这是我能想到的唯一办法确保不会有任何遗漏。

  2. 此模式只能在isProducerFinishedReading == true节省一个资源/时间的情况下进行。
  3. 将系统重新设计为有10个队列,然后当一个项目被添加到队列n中时,消费者线程n被唤醒。当它处理完元素后,它再次检查队列中是否有要处理的新项。在任何情况下,当读取完成时,生产者都应该检查所有队列的长度,并唤醒相应的线程。

希望有帮助。

编辑:

每当一个线程完成工作,它应该再次检查队列,如果项目有"他的",然后它做工作。如果一个线程可以唤醒其他线程,那么它应该唤醒相应的线程。

最新更新