c++11具有2个线程的无锁队列



除了主线程,我还有一个线程接收数据并将其写入文件。

std::queue<std::vector<int>> dataQueue;
std::mutex mutex;
void setData(const std::vector<int>& data) {
std::lock_guard<std::mutex> lock(mutex);
dataQueue.push(data);
}
void write(const std::string& fileName) {
std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));
while (store) {
std::lock_guard<std::mutex> lock(mutex);
while (!dataQueue.empty()) {
std::vector<int>& data= dataQueue.front();
ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());
dataQueue.pop();
}
}
}
}

setData由主线程使用,而write实际上是写入线程。我使用std::lock_quard来避免内存冲突,但当锁定写线程时,它会减慢主线程的速度,因为它必须等待队列解锁。但我想我可以避免这种情况,因为线程永远不会同时作用于队列的同一元素。

所以我想在不锁定的情况下实现它,但我真的不明白该如何实现。我的意思是,我怎么能在不锁定任何东西的情况下做到这一点?此外,如果写线程比主线程快,那么队列可能在大多数时间都是空的,因此它应该以某种方式等待新数据,而不是无限循环来检查非空队列。

EDIT:我用std::cond_variable更改了简单的std::lock_guard,以便在队列为空时等待。但是主线程仍然可以被阻塞,因为当cvQeue.wait(.)被解析时,它会重新获取锁。此外,如果主线程执行cvQueue.notify_one(),但写入线程没有等待,该怎么办?

std::queue<std::vector<int>> dataQueue;
std::mutex mutex;
std::condition_variable cvQueue;
void setData(const std::vector<int>& data) {
std::unique_lock<std::mutex> lock(mutex);
dataQueue.push(data);
cvQueue.notify_one();
}
void write(const std::string& fileName) {
std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));
while (store) {
std::lock_guard<std::mutex> lock(mutex);
while (!dataQueue.empty()) {
std::unique_lock<std::mutex> lock(mutex);
cvQueue.wait(lock);
ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());
dataQueue.pop();
}
}
}
}

如果只有两个线程,则可以使用无锁的单生产者-单消费者(SPSC(队列
可以在此处找到有界版本:https://github.com/rigtor/SPSCQueue
Dmitry Vyukov在这里展示了一个无界版本:http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue(不过,您应该注意,此代码应该适合使用原子。(

关于阻塞弹出操作-这是无锁数据结构所不提供的,因为这样的操作显然不是无锁的。然而,以这样一种方式调整链接的实现应该是相对直接的,即如果在推送之前队列是空的,则推送操作会通知条件变量。

我想我有一些东西可以满足我的需求。我做了一个使用std::atomicLockFreeQueue。因此,我可以原子化地管理队列的头/尾的状态。

template<typename T>
class LockFreeQueue {
public:
void push(const T& newElement) {
fifo.push(newElement);
tail.fetch_add(1);
cvQueue.notify_one();
}
void pop() {
size_t oldTail = tail.load();
size_t oldHead = head.load();
if (oldTail == oldHead) {
return;
}
fifo.pop();
head.store(++oldHead);
}
bool isEmpty() {
return head.load() == tail.load();
}
T& getFront() {
return fifo.front();
}
void waitForNewElements() {
if (tail.load() == head.load()) {
std::mutex m;
std::unique_lock<std::mutex> lock(m);
cvQueue.wait_for(lock, std::chrono::milliseconds(TIMEOUT_VALUE));
}
}
private:
std::queue<T> fifo;
std::atomic<size_t> head = { 0 };
std::atomic<size_t> tail = { 0 };
std::condition_variable cvQueue;
};
LockFreeQueue<std::vector<int>> dataQueue;
std::atomic<bool> store(true);
void setData(const std::vector<int>& data) {
dataQueue.push(data);
// do other things
}
void write(const std::string& fileName) {
std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));
while (store.load()) {
dataQueue.waitForNewElements();
while (!dataQueue.isEmpty()) {
std::vector<int>& data= dataQueue.getFront();
ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());
dataQueue.pop();
}
}
}
}

我在waitForNewElements中还有一个锁,但它并没有锁定整个过程,因为它在等待事情的发生。但最大的改进是,生产者可以在消费者弹出的同时推动。只有当LockFreQueue::tailLockFreeQueue::head相同时才被禁止。这意味着队列为空并且进入等待状态。

我不太满意的是cvQueue.wait_for(lock, TIMEOUT_VALUE)。我想做一个简单的cvQueue.wait(lock),但问题是在结束线程时,我在主线程中做store.store(false)。因此,如果写线程正在等待,它永远不会在没有超时的情况下结束。因此,我设置了一个足够大的超时,这样condition_variable的大部分时间都由锁解析,当线程结束时,它由超时解析。

如果你觉得某些地方一定有问题或需要改进,请随时发表评论。

最新更新