多生产者-消费者问题一直停留在最后一次消费上



我试图使用pthreads和信号量来解决一个多生产者-消费者问题,但它总是停留在最后一次消费和停止。它将有NO_ITEMS项目,并假设缓冲区的大小为buffer_size

下面是我当前的代码。

#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <stack>
#define BUFFER_SIZE 50
#define NO_ITEMS 100
using namespace std;
void* thread_producer(void* args);
void* thread_consumer(void* args);
void addItem(int i);
void removeItem();
sem_t fillCount;
sem_t emptyCount;
pthread_mutex_t mutex;
stack<int> items;
static int count = 0;

int main()
{
sem_init(&fillCount, 0, 0);
sem_init(&emptyCount, 0, BUFFER_SIZE);
pthread_mutex_init(&mutex, nullptr);
pthread_t p1, c1, c2, c3;
pthread_create(&p1, nullptr, thread_producer, nullptr);
pthread_create(&c1, nullptr, thread_consumer, nullptr);
pthread_create(&c2, nullptr, thread_consumer, nullptr);
pthread_create(&c3, nullptr, thread_consumer, nullptr);
pthread_join(p1, nullptr);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
sem_destroy(&fillCount);
sem_destroy(&emptyCount);
pthread_mutex_destroy(&mutex);
return 0;
}
void* thread_consumer(void* args) {
while (count < NO_ITEMS) {
sem_wait(&fillCount);
pthread_mutex_lock(&mutex);
if (!items.empty() && count < NO_ITEMS - 1) {
removeItem();
}
count++;
pthread_mutex_unlock(&mutex);
sem_post(&emptyCount);
}
return nullptr;
}
void* thread_producer(void* args) {
for (int i = 0; i < NO_ITEMS; i++) {
sem_wait(&emptyCount);
pthread_mutex_lock(&mutex);
addItem(i);
// sleep(1);
pthread_mutex_unlock(&mutex);
sem_post(&fillCount);
}
return nullptr;
}
void addItem(int i) {
cout << "Produced: " << i << endl;
items.push(i);
}
void removeItem() {
cout << "Consumed: " << items.top() << endl;
items.pop();
}

这是输出的一部分:

Consumed: 0
Produced: 96
Consumed: 96
Produced: 97
Produced: 98
Consumed: 98
Consumed: 97
Produced: 99 // halt

逻辑有缺陷

您的代码存在逻辑问题。假设NO_ITEMS是100,并且到目前为止已经消耗了99。让两个使用者线程在这一点上到达while循环的顶部,并假设两者都将count读取为99(但请参见下文(,因此进入循环的主体。两个使用者都会在sem_wait()上阻塞,但最多还有一个项目要生成,因此生产者最多会再次增加信号量,从而使至少一个使用者无限期地阻塞。

未定义的行为

此外,thread_consumer()函数包含一个数据竞赛,使程序的行为未定义。具体地,在while条件下对共享变量count的读取没有正确地同步。尽管不能可靠地预测UB将如何表现(否则它不会"未定义"(,但非同步访问表现一个线程的明显故障以查看其他线程的共享变量更新是相当常见的。这样的失败模式可以解释你观察到的特定行为。

很可能,对这个同步问题的正确修复也会修复逻辑问题。

解决方案

有多种可能的解决方案。以下是一些有前景的:

  1. 信号灯不是特别适合这个问题。无论如何,您都需要一个互斥锁,而它通常用于发出信号的对应对象是一个条件变量。我会将两个信号量转换为两个(或者可能只有一个(普通整数变量,并在生产者和消费者中使用标准的互斥+CV模式。这将包括为消费者中count的读取添加互斥保护。

  2. 另一方面,如果你有义务使用信号量,那么你可以

    • 为消费者读取count添加适当的互斥保护
    • 在成功减少信号量后,一定要保留使用者的测试,以确定他们是否真的可以使用项目
    • 在加入生产者线程之后但在尝试加入消费者之前,让主程序向fillCount发布两次(消费者数量-1次(。这将解锁任何认为自己可以消费某个商品,但在另一个消费者消费完最后一个商品后仍在等待的消费者
  3. 或者,您可以使用混合:保留emptyCount信号量来限制在任何给定时间等待的项目数量(而不是为此切换到CV(,但切换到互斥+CV模式来管理消费者。

相关内容

  • 没有找到相关文章

最新更新