我正在尝试实现一个使用两个线程的类:一个用于生产者,一个用于消费者。当前实现不使用锁:
#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
using Queue =
boost::lockfree::spsc_queue<
int,
boost::lockfree::capacity<1024>>;
class Worker
{
public:
Worker() : working_(false), done_(false) {}
~Worker() {
done_ = true; // exit even if the work has not been completed
worker_.join();
}
void enqueue(int value) {
queue_.push(value);
if (!working_) {
working_ = true;
worker_ = std::thread([this]{ work(); });
}
}
void work() {
int value;
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
working_ = false;
}
private:
std::atomic<bool> working_;
std::atomic<bool> done_;
Queue queue_;
std::thread worker_;
};
应用程序需要将工作项排队一段时间,然后休眠等待事件。这是一个模拟行为的最小main:
int main()
{
Worker w;
for (int i = 0; i < 1000; ++i)
w.enqueue(i);
std::this_thread::sleep_for(std::chrono::seconds(1));
for (int i = 0; i < 1000; ++i)
w.enqueue(i);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
我很确定我的实现是有bug的:如果工作线程完成并在执行working_ = false
之前,另一个enqueue
来了呢?是否有可能使我的代码线程安全不使用锁?
解决方案要求:
- 快速排队
- 即使队列不为空,析构函数也必须退出
- 没有忙等待,因为工作线程有很长一段时间是空闲的
- 如果可能的话没有锁
根据您的建议,我做了另一个Worker
类的实现。这是我的第二次尝试:
class Worker
{
public:
Worker()
: working_(ATOMIC_FLAG_INIT), done_(false) { }
~Worker() {
// exit even if the work has not been completed
done_ = true;
if (worker_.joinable())
worker_.join();
}
bool enqueue(int value) {
bool enqueued = queue_.push(value);
if (!working_.test_and_set()) {
if (worker_.joinable())
worker_.join();
worker_ = std::thread([this]{ work(); });
}
return enqueued;
}
void work() {
int value;
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
working_.clear();
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
}
private:
std::atomic_flag working_;
std::atomic<bool> done_;
Queue queue_;
std::thread worker_;
};
我在enqueue
方法中引入了worker_.join()
。这可能会影响性能,但在极少数情况下(当队列为空并且线程退出之前,会出现另一个enqueue
)。working_
变量现在是atomic_flag
,在enqueue
中设置,在work
中清除。在working_.clear()
之后需要额外的while
,因为如果在clear
之前,但在while
之后推送另一个值,则不处理该值。
这个实现正确吗?
我做了一些测试,实现似乎可以工作。
OT:这是一个编辑,还是一个回答?如果工作线程完成并且在执行working_ = false之前,另一个队列来了怎么办?
则该值将被推入队列,但不会被处理,直到另一个值在设置标志后加入队列。您(或您的用户)可以决定这是否可以接受。可以使用锁来避免这种情况,但是它们违背了您的要求。
如果正在运行的线程即将完成并且设置了working_ = false;
,但是在next值进入队列之前还没有停止运行,代码可能会失败。在这种情况下,你的代码将在运行的线程上调用operator=,根据链接的文档调用std::terminate
。
在将worker分配给新线程之前添加worker_.join()
应该可以防止这种情况。
另一个问题是,如果队列已满,queue_.push
可能会失败,因为它的大小是固定的。目前,您只是忽略了这种情况,并且该值不会被添加到完整队列中。如果等待队列有空间,则无法获得快速队列(在边缘情况下)。您可以接受push
返回的bool值(告诉它是否成功),并从enqueue
返回它。这样调用者就可以决定是等待还是丢弃该值。
或者使用非固定大小的队列。Boost对这个选择是这样说的:
可以用来在push期间完全禁用动态内存分配,以确保无锁行为。如果数据结构配置为固定大小,则内部节点存储在数组中并对其进行寻址通过数组索引。这将队列的可能大小限制为索引可以寻址的元素数量类型(通常是2**16-2),但在缺乏双宽度比较和交换指令的平台上,这是最好的方法实现锁自由。
你的工作线程需要超过2个状态。
- 未运行 <
- 做任务/gh>
- 空闲关闭 关闭
如果强制关机,它会跳过空闲关机。如果您的任务用完,它将转换为空闲关机。在空闲关机时,它清空任务队列,然后进入关机状态。
设置关机,然后离开工作任务的末端。
生产者首先把东西放到队列中。然后检查工作器状态。如果Shutdown或Idle Shutdown,首先join
它(并将其转换为未运行),然后启动一个新的worker。如果没有运行,就启动一个新的worker。
如果生产者想要启动一个新的工作者,它首先要确保我们处于未运行状态(否则,逻辑错误)。然后切换到Doing tasks状态,然后启动工作线程。
如果生产者想要关闭helper任务,它会设置done标志。然后检查工作状态。如果它不是在运行,它就加入它。
这会导致一个工作线程无缘无故地启动。
在少数情况下,上面的语句可以阻塞,但在此之前也有一些情况。
然后,我们写一个正式或半正式的证明,证明上面的代码不会丢失消息,因为当你写无锁的代码时,直到你有一个证明,你才完成。
这是我对这个问题的解决方案。我不太喜欢回答自己,但我认为展示实际的代码可能会帮助别人。
#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
// I used this semaphore class: https://gist.github.com/yohhoy/2156481
#include "binsem.hpp"
using Queue =
boost::lockfree::spsc_queue<
int,
boost::lockfree::capacity<1024>>;
class Worker
{
public:
// the worker thread starts in the constructor
Worker()
: working_(ATOMIC_FLAG_INIT), done_(false), semaphore_(0)
, worker_([this]{ work(); })
{ }
~Worker() {
// exit even if the work has not been completed
done_ = true;
semaphore_.signal();
worker_.join();
}
bool enqueue(int value) {
bool enqueued = queue_.push(value);
if (!working_.test_and_set())
// signal to the worker thread to wake up
semaphore_.signal();
return enqueued;
}
void work() {
int value;
// the worker thread continue to live
while (!done_) {
// wait the start signal, sleeping
semaphore_.wait();
while (!done_ && queue_.pop(value)) {
// perform actual work
std::cout << value << std::endl;
}
working_.clear();
while (!done_ && queue_.pop(value)) {
// perform actual work
std::cout << value << std::endl;
}
}
}
private:
std::atomic_flag working_;
std::atomic<bool> done_;
binsem semaphore_;
Queue queue_;
std::thread worker_;
};
我尝试了@Cameron的建议,不要关闭线程并添加一个信号量。这实际上只在第一个enqueue
和最后一个work
中使用。这不是无锁的,但只有在这两种情况下。
我做了一些性能比较,在我以前的版本(见我编辑的问题),这一个。没有明显的差别,开始和停止的时候也没有很多。然而,当enqueue
必须对工作线程进行signal
而不是启动新线程时,它的速度要快10倍。这是一种罕见的情况,所以它不是很重要,但无论如何这是一个改进。
这个实现满足:
- 一般情况下无锁(
enqueue
和work
忙时); - 请勿忙等待,以防长时间没有
enqueue
- 析构函数尽快退出
- 正确性?:)
非常片面的回答:我认为所有这些原子、信号量和状态都是一个反向通信通道,从"线程"到"Worker"。为什么不使用另一个队列呢?