在阅读了其他一些文章后,我知道我可以实现这样的c++阻塞队列:
template<typename T>
class BlockingQueue {
public:
std::mutex mtx;
std::condition_variable not_full;
std::condition_variable not_empty;
std::queue<T> queue;
size_t capacity{5};
BlockingQueue()=default;
BlockingQueue(int cap):capacity(cap) {}
BlockingQueue(const BlockingQueue&)=delete;
BlockingQueue& operator=(const BlockingQueue&)=delete;
void push(const T& data) {
std::unique_lock<std::mutex> lock(mtx);
while (queue.size() >= capacity) {
not_full.wait(lock, [&]{return queue.size() < capacity;});
}
queue.push(data);
not_empty.notify_all();
}
T pop() {
std::unique_lock<std::mutex> lock(mtx);
while (queue.empty()) {
not_empty.wait(lock, [&]{return !queue.empty();});
}
T res = queue.front();
queue.pop();
not_full.notify_all();
return res;
}
bool empty() {
std::unique_lock<std::mutex> lock(mtx);
return queue.empty();
}
size_t size() {
std::unique_lock<std::mutex> lock(mtx);
return queue.size();
}
void set_capacity(const size_t capacity) {
this->capacity = (capacity > 0 ? capacity : 10);
}
};
这对我来说很有效,但我不知道如果我在后台线程中启动它,我该如何关闭它
void main() {
BlockingQueue<float> q;
bool stop{false};
auto fun = [&] {
std::cout << "before entering loopn";
while (!stop) {
q.push(1);
}
std::cout << "after entering loopn";
};
std::thread t_bg(fun);
t_bg.detach();
// Some other tasks here
stop = true;
// How could I shut it down before quit here, or could I simply let the operation system do that when the whole program is over?
}
问题是,当我想关闭后台线程时,后台线程可能一直在休眠,因为队列已满,推送操作被阻止。当我希望后台线程停止时,我该如何停止它?
一个简单的方法是添加一个标志,当您想中止已经被阻止的pop()
操作时,可以从外部设置该标志。然后,您必须决定中止的pop()
将返回什么。一种方法是抛出异常,另一种方法则是返回std::optional<T>
。这是第一种方法(我只写更改的部分。(
在您认为合适的地方添加此类型:
struct AbortedPopException {};
将其添加到您的类字段中:
mutable std::atomic<bool> abort_flag = false;
另外添加此方法:
void abort () const {
abort_flag = true;
}
更改pop()
方法中的while
循环如下:(您根本不需要while
,因为我相信接受lambda的条件变量wait()
方法不会错误地唤醒/返回;即循环已经在等待中。(
not_empty.wait(lock, [this]{return !queue.empty() || abort_flag;});
if (abort_flag)
throw AbortedPopException{};
就是这样(我相信。(
在main()
中,当您想关闭"消费者"时,您可以在队列中调用abort()
。但是您还必须在那里处理抛出的异常。基本上,这是你的"退出"信号。
一些旁注:
不要脱离线程!特别是在AFAICT没有理由的情况下(还有一些实际的危险(。只需发出信号(以任何适当的方式(让他们退出并
join()
他们。您的
stop
标志应该是原子标志。你在后台线程中读取它,在主线程中写入它,这些可能(事实上(在时间上重叠,所以……数据竞赛!我不明白为什么你的队列中有"满"状态和"容量"。想想它们是否有必要。
更新1:响应OP关于分离的评论。。。以下是您的主线程中发生的情况:
- 生成"生产者"线程(即将内容推送到队列中的线程(
- 然后你做所有你想做的工作(例如消耗队列中的东西(
- 有时,可能在
main()
结束时,您向线程发出停止信号(例如,通过将stop
标志设置为true
( - 然后,也只有这样,你才能用线程
join()
确实,主线程在等待线程接收"停止"信号、退出循环并从线程函数返回时会阻塞,但这是一个非常非常短的等待。而且你没有其他事情可做。更重要的是,你会知道你的线程干净且可预测地退出了,从那时起,你肯定知道该线程不会运行(在这里对你来说不重要,但对其他线程任务可能至关重要。(
这是您通常希望在生成在短任务上循环的工作线程时遵循的模式。
更新2:关于队列的"已满"one_answers"容量"。没关系。这当然是你的决定。没问题。
更新3:关于"抛出"与返回"空"对象以发出中止的"阻塞pop()
"的信号。我不认为那样扔有什么错;特别是因为它非常非常罕见(只在生产者/消费者的操作结束时发生一次(。但是,如果要存储在Queue
中的所有T
类型都具有"无效"或"空"状态,那么您当然可以使用它。但投掷更为普遍,如果对一些人来说更"恶心"的话。