C++中的多线程生产者/使用者



我正在研究多线程并编写了一个基本的生产者/消费者。我对下面写的生产者/消费者有两个问题。1(即使将消费者的睡眠时间设置为低于生产者的睡眠时间,生产者似乎仍然执行得更快。2(在消费者中,在生产者完成添加到队列的情况下,我已经复制了代码,但队列中仍有元素。有什么建议可以更好地构建代码吗?

#include <iostream>
#include <queue>
#include <mutex>
class App {
private:
std::queue<int> m_data;
bool m_bFinished;
std::mutex m_Mutex;
int m_ConsumerSleep;
int m_ProducerSleep;
int m_QueueSize;
public:
App(int &MaxQueue) :m_bFinished(false), m_ConsumerSleep(1), m_ProducerSleep(5), m_QueueSize(MaxQueue){}
void Producer() {
for (int i = 0; i < m_QueueSize; ++i) {
std::lock_guard<std::mutex> guard(m_Mutex);
m_data.push(i); 
std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(m_ProducerSleep));
}
m_bFinished = true;
}
void Consumer() {
while (!m_bFinished) {
if (m_data.size() > 0) {
std::lock_guard<std::mutex> guard(m_Mutex);
std::cout << "Consumer Thread, queue element: " << m_data.front() << " size: " << m_data.size() << std::endl;
m_data.pop();
}
else {
std::cout << "No elements, skipping" << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(m_ConsumerSleep));
}
while (m_data.size() > 0) {
std::lock_guard<std::mutex> guard(m_Mutex);
std::cout << "Emptying remaining elements " << m_data.front() << std::endl;
m_data.pop();
std::this_thread::sleep_for(std::chrono::seconds(m_ConsumerSleep));
}
}
};

int main()
{
int QueueElements = 10;
App app(QueueElements);
std::thread consumer_thread(&App::Consumer, &app);
std::thread producer_thread(&App::Producer, &app);
producer_thread.join();
consumer_thread.join();

std::cout << "loop exited" << std::endl;
return 0;
}

你应该使用condition_variable。不要对线程使用睡眠。

主要方案: 生产者在锁定状态下推送值并发出condition_variable信号。

使用者在锁定条件变量的情况下等待,并检查谓词以防止虚假唤醒。

我的版本:

#include <iostream>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <atomic>
class App {
private:
std::queue<int> m_data;
std::atomic_bool m_bFinished;
std::mutex m_Mutex;
std::condition_variable m_cv;
int m_QueueSize;
public:
App(int MaxQueue) 
: m_bFinished(false)
, m_QueueSize(MaxQueue) 
{}
void Producer()
{
for (int i = 0; i < m_QueueSize; ++i) 
{
{
std::unique_lock<std::mutex> lock(m_Mutex);
m_data.push(i); 
}
m_cv.notify_one();
std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
}
m_bFinished = true;
}
void Consumer() 
{
do
{
std::unique_lock<std::mutex> lock(m_Mutex);
while (m_data.empty())
{
m_cv.wait(lock, [&](){ return !m_data.empty(); }); // predicate an while loop - protection from spurious wakeups
}
while(!m_data.empty()) // consume all elements from queue
{
std::cout << "Consumer Thread, queue element: " << m_data.front() << " size: " << m_data.size() << std::endl;
m_data.pop();
}
} while(!m_bFinished);
}
};

int main()
{
int QueueElements = 10;
App app(QueueElements);
std::thread consumer_thread(&App::Consumer, &app);
std::thread producer_thread(&App::Producer, &app);
producer_thread.join();
consumer_thread.join();
std::cout << "loop exited" << std::endl;
return 0;
}

另请注意,当您处理并发线程时,最好使用 atomic for end 标志,因为理论上m_bFinished的值将存储在缓存行中,如果生产者线程中没有缓存失效,则从使用者线程中看不到更改的值。原子有内存围栏,保证该值将针对其他线程进行更新。

您也可以查看memory_order页面。

首先,您应该使用条件变量,而不是对使用者的延迟。这样,使用者线程仅在队列不为空且生成者通知它时才会唤醒。

也就是说,生产者调用更频繁的原因是生产者线程上的延迟。它是在持有互斥锁时执行的,因此在延迟结束之前,使用者永远不会执行。您应该在调用sleep_for之前释放互斥锁:

for (int i = 0; i < m_QueueSize; ++i) {
/* Introduce a scope to release the mutex before sleeping*/
{
std::lock_guard<std::mutex> guard(m_Mutex);
m_data.push(i); 
std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
} // Mutex is released here
std::this_thread::sleep_for(std::chrono::seconds(m_ProducerSleep));
}

最新更新