如何退出C++阻塞队列



在阅读了其他一些文章后,我知道我可以实现这样的c++阻塞队列:

template<typename T>
class BlockingQueue {
public:
std::mutex mtx;
std::condition_variable not_full;
std::condition_variable not_empty;
std::queue<T> queue;
size_t capacity{5};
BlockingQueue()=default;
BlockingQueue(int cap):capacity(cap) {}
BlockingQueue(const BlockingQueue&)=delete;
BlockingQueue& operator=(const BlockingQueue&)=delete;
void push(const T& data) {
std::unique_lock<std::mutex> lock(mtx);
while (queue.size() >= capacity) {
not_full.wait(lock, [&]{return queue.size() < capacity;});
}
queue.push(data);
not_empty.notify_all();
}
T pop() {
std::unique_lock<std::mutex> lock(mtx);
while (queue.empty()) {
not_empty.wait(lock, [&]{return !queue.empty();});
}
T res = queue.front();
queue.pop();
not_full.notify_all();
return res;
}
bool empty() {
std::unique_lock<std::mutex> lock(mtx);
return queue.empty();
}
size_t size() {
std::unique_lock<std::mutex> lock(mtx);
return queue.size();
}
void set_capacity(const size_t capacity) {
this->capacity = (capacity > 0 ? capacity : 10);
}
};

这对我来说很有效,但我不知道如果我在后台线程中启动它,我该如何关闭它

void main() {
BlockingQueue<float> q;
bool stop{false};
auto fun = [&] {
std::cout << "before entering loopn";
while (!stop) {
q.push(1);
}
std::cout << "after entering loopn";
};
std::thread t_bg(fun);
t_bg.detach();
// Some other tasks here
stop = true;
// How could I shut it down before quit here, or could I simply let the operation system do that when the whole program is over?
}

问题是,当我想关闭后台线程时,后台线程可能一直在休眠,因为队列已满,推送操作被阻止。当我希望后台线程停止时,我该如何停止它?

一个简单的方法是添加一个标志,当您想中止已经被阻止的pop()操作时,可以从外部设置该标志。然后,您必须决定中止的pop()将返回什么。一种方法是抛出异常,另一种方法则是返回std::optional<T>。这是第一种方法(我只写更改的部分。(

在您认为合适的地方添加此类型:

struct AbortedPopException {};

将其添加到您的类字段中:

mutable std::atomic<bool> abort_flag = false;

另外添加此方法:

void abort () const {
abort_flag = true;
}

更改pop()方法中的while循环如下:(您根本不需要while,因为我相信接受lambda的条件变量wait()方法不会错误地唤醒/返回;即循环已经在等待中。(

not_empty.wait(lock, [this]{return !queue.empty() || abort_flag;});
if (abort_flag)
throw AbortedPopException{};

就是这样(我相信。(

main()中,当您想关闭"消费者"时,您可以在队列中调用abort()。但是您还必须在那里处理抛出的异常。基本上,这是你的"退出"信号。

一些旁注:

  1. 不要脱离线程!特别是在AFAICT没有理由的情况下(还有一些实际的危险(。只需发出信号(以任何适当的方式(让他们退出并join()他们。

  2. 您的stop标志应该是原子标志。你在后台线程中读取它,在主线程中写入它,这些可能(事实上(在时间上重叠,所以……数据竞赛!

  3. 我不明白为什么你的队列中有"满"状态和"容量"。想想它们是否有必要。

更新1:响应OP关于分离的评论。。。以下是您的主线程中发生的情况:

  1. 生成"生产者"线程(即将内容推送到队列中的线程(
  2. 然后你做所有你想做的工作(例如消耗队列中的东西(
  3. 有时,可能在main()结束时,您向线程发出停止信号(例如,通过将stop标志设置为true(
  4. 然后,也只有这样,你才能用线程join()

确实,主线程在等待线程接收"停止"信号、退出循环并从线程函数返回时会阻塞,但这是一个非常非常短的等待。而且你没有其他事情可做。更重要的是,你会知道你的线程干净且可预测地退出了,从那时起,你肯定知道该线程不会运行(在这里对你来说不重要,但对其他线程任务可能至关重要。(

这是您通常希望在生成在短任务上循环的工作线程时遵循的模式。

更新2:关于队列的"已满"one_answers"容量"。没关系。这当然是你的决定。没问题。

更新3:关于"抛出"与返回"空"对象以发出中止的"阻塞pop()"的信号。我不认为那样扔有什么错;特别是因为它非常非常罕见(只在生产者/消费者的操作结束时发生一次(。但是,如果要存储在Queue中的所有T类型都具有"无效"或"空"状态,那么您当然可以使用它。但投掷更为普遍,如果对一些人来说更"恶心"的话。

最新更新