我试图使用pthreads和信号量来解决一个多生产者-消费者问题,但它总是停留在最后一次消费和停止。它将有NO_ITEMS项目,并假设缓冲区的大小为buffer_size
下面是我当前的代码。
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <stack>
#define BUFFER_SIZE 50
#define NO_ITEMS 100
using namespace std;
void* thread_producer(void* args);
void* thread_consumer(void* args);
void addItem(int i);
void removeItem();
sem_t fillCount;
sem_t emptyCount;
pthread_mutex_t mutex;
stack<int> items;
static int count = 0;
int main()
{
sem_init(&fillCount, 0, 0);
sem_init(&emptyCount, 0, BUFFER_SIZE);
pthread_mutex_init(&mutex, nullptr);
pthread_t p1, c1, c2, c3;
pthread_create(&p1, nullptr, thread_producer, nullptr);
pthread_create(&c1, nullptr, thread_consumer, nullptr);
pthread_create(&c2, nullptr, thread_consumer, nullptr);
pthread_create(&c3, nullptr, thread_consumer, nullptr);
pthread_join(p1, nullptr);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
sem_destroy(&fillCount);
sem_destroy(&emptyCount);
pthread_mutex_destroy(&mutex);
return 0;
}
void* thread_consumer(void* args) {
while (count < NO_ITEMS) {
sem_wait(&fillCount);
pthread_mutex_lock(&mutex);
if (!items.empty() && count < NO_ITEMS - 1) {
removeItem();
}
count++;
pthread_mutex_unlock(&mutex);
sem_post(&emptyCount);
}
return nullptr;
}
void* thread_producer(void* args) {
for (int i = 0; i < NO_ITEMS; i++) {
sem_wait(&emptyCount);
pthread_mutex_lock(&mutex);
addItem(i);
// sleep(1);
pthread_mutex_unlock(&mutex);
sem_post(&fillCount);
}
return nullptr;
}
void addItem(int i) {
cout << "Produced: " << i << endl;
items.push(i);
}
void removeItem() {
cout << "Consumed: " << items.top() << endl;
items.pop();
}
这是输出的一部分:
Consumed: 0
Produced: 96
Consumed: 96
Produced: 97
Produced: 98
Consumed: 98
Consumed: 97
Produced: 99 // halt
逻辑有缺陷
您的代码存在逻辑问题。假设NO_ITEMS
是100,并且到目前为止已经消耗了99。让两个使用者线程在这一点上到达while
循环的顶部,并假设两者都将count
读取为99(但请参见下文(,因此进入循环的主体。两个使用者都会在sem_wait()
上阻塞,但最多还有一个项目要生成,因此生产者最多会再次增加信号量,从而使至少一个使用者无限期地阻塞。
未定义的行为
此外,thread_consumer()
函数包含一个数据竞赛,使程序的行为未定义。具体地,在while
条件下对共享变量count
的读取没有正确地同步。尽管不能可靠地预测UB将如何表现(否则它不会"未定义"(,但非同步访问表现一个线程的明显故障以查看其他线程的共享变量更新是相当常见的。这样的失败模式可以解释你观察到的特定行为。
很可能,对这个同步问题的正确修复也会修复逻辑问题。
解决方案
有多种可能的解决方案。以下是一些有前景的:
-
信号灯不是特别适合这个问题。无论如何,您都需要一个互斥锁,而它通常用于发出信号的对应对象是一个条件变量。我会将两个信号量转换为两个(或者可能只有一个(普通整数变量,并在生产者和消费者中使用标准的互斥+CV模式。这将包括为消费者中
count
的读取添加互斥保护。 -
另一方面,如果你有义务使用信号量,那么你可以
- 为消费者读取
count
添加适当的互斥保护 - 在成功减少信号量后,一定要保留使用者的测试,以确定他们是否真的可以使用项目
- 在加入生产者线程之后但在尝试加入消费者之前,让主程序向
fillCount
发布两次(消费者数量-1次(。这将解锁任何认为自己可以消费某个商品,但在另一个消费者消费完最后一个商品后仍在等待的消费者
- 为消费者读取
-
或者,您可以使用混合:保留
emptyCount
信号量来限制在任何给定时间等待的项目数量(而不是为此切换到CV(,但切换到互斥+CV模式来管理消费者。