C++ 多个使用者线程卡在条件变量上



我正在C++中制作一个生产者,多个消费者程序。我首先调用使用者线程,然后将元素添加到数组中。 一切正常,但最终使用者线程没有加入,因为它们被困在等待条件变量并且程序冻结。

我认为问题是线程在循环中不断被调用,因为 currentSize 不受保护,它们无法退出条件变量,但我不知道如何解决它。

struct Item {
public:
string name;
int time;
double height;
};
struct Monitor {
private:
Item items[12];
int currentSize;
bool finished;
mutex lock;
condition_variable cv;
public:
Monitor() {
finished = false;
currentSize = 0;
}
void put(Item item) {
unique_lock<mutex> guard(lock);
cv.wait(guard, [&] { return (currentSize < 12); });
items[currentSize] = item;
currentSize++;
cv.notify_all();
}
Item get() {
unique_lock<mutex> guard(lock);
cv.wait(guard, [&] { return (currentSize > 0); });
Item item = items[currentSize - 1];
currentSize--;
return item;
}
bool get_finished() {
return finished;
}
void set_finished() {
finished = true;
}
int get_size() {
return currentSize;
}
};
int main() {
vector<Item> items = read_file(file);
Monitor monitor;
vector<thread> threads;
vector<Item> results;
for (int i = 0; i < 4; i++) {
threads.emplace_back([&] {
while (!monitor.get_finished()) {
if (monitor.get_size() > 0) {
Item item = monitor.get();
results.push_back(item);
}
}
});
}
for (int i = 0; i < items.size(); i++) {
monitor.put(items[i]);
}
monitor.set_finished();
for_each(threads.begin(), threads.end(), mem_fn(&thread::join));
return 0;
}

为什么使用者线程阻塞?

我已经测试了您的代码,结果证明它是put()方法上的生产者线程阻塞。为什么?

想象一下以下场景:向量items中有 13 个项目。

主线程(生产者(愉快地加载前 12 个项目,并等待cvcurrentSize低于 12。

使用者线程收到通知,并愉快地使用前 12 个项目,然后等待cvcurrentSize大于 0。

但是等等!现在每个人都在等待一些事情,没有人通知。因此,所有线程都会阻塞。您需要在currentSize低于 12 时通知生产者。

我注意到了一些问题。 使成员变量原子化,notify_all获取 API 中。但是,也存在逻辑错误。假设您当前有 4 个线程正在运行,5 个项目在队列中。在这一点上,假设每个线程都能够从队列中取出一个线程,现在队列中有 4 个线程,只有一个项目。其中一个线程取出最后一个,现在那里有 0 个项目,但其他三个线程仍在等待条件变量。因此,一个解决方案是,如果最后一项已出,则应通知每个线程,如果没有其他elemnet,则应从API返回。

#include <iostream>
#include <vector>
#include <condition_variable>
#include <thread>
#include <algorithm>
#include <atomic>
using namespace std;
using Item = int;
struct Monitor {
private:
Item items[12];
std::atomic<int> currentSize;
std::atomic<bool> finished;
mutex lock;
condition_variable cv;
public:
Monitor() {
finished = false;
currentSize = 0;
}
void put(Item item) {
unique_lock<mutex> guard(lock);
cv.wait(guard, [&] { return (currentSize < 12); });
items[currentSize] = item;
currentSize++;
cv.notify_all();
std::cerr << "+ " << currentSize << std::endl ;
}
Item get() {
unique_lock<mutex> guard(lock);
cv.wait(guard, [&] { return (currentSize >= 0 ); });
Item item;
if (currentSize > 0 ){      
currentSize--;
item = items[currentSize];
cv.notify_all();
std::cerr << "- " << currentSize << std::endl ;
}
return item;
}
bool get_finished() {
return finished;
}
void set_finished() {
finished = true;
}
int get_size() {
return currentSize;
}
};
int main() {
vector<Item> items(200);
std::fill ( items.begin() , items.end(), 100);
Monitor monitor;
vector<thread> threads;
vector<Item> results;
for (int i = 0; i < 10; i++) {
threads.emplace_back([&] {
while ( !monitor.get_finished() ) {
if (monitor.get_size() > 0) {
Item item = monitor.get();
results.push_back(item);
}
}
});
}
for (int i = 0; i < items.size(); i++) {
monitor.put(items[i]);
}
monitor.set_finished();
for_each(threads.begin(), threads.end(), mem_fn(&thread::join));
return 0;
}

最新更新