我正在用c++为实时系统编写一个无锁的单消费者单生产者可增长队列。内部队列可以工作,但它需要是可增长的。生产者线程是实时的,因此任何操作都需要是确定性的(因此没有等待、锁、内存分配),而消费者线程则不是。
因此,如果需要的话,消费者线程偶尔会增加队列的大小。队列的实现使得消费者端无法增长。因此,实际的队列被间接地封装在一个对象中,该对象分配调用,实际的增长是通过将对内部队列的引用交换到一个新的来实现的,同时保留旧的队列,以防生产者线程正在使用它。问题是,然而,我无法弄清楚如何证明生产者线程何时停止使用旧队列,因此删除是安全的,而不必诉诸锁。下面是代码的伪表示:
template<typename T>
class queue
{
public:
queue()
: old(nullptr)
{
current.store(nullptr);
grow();
}
bool produce(const T & data)
{
qimpl * q = current.load();
return q->produce(data);
}
bool consume(T & data)
{
// the queue has grown? if so, a new and an old queue exists. consume the old firstly.
if (old)
{
// here is the problem. we never really know when the producer thread stops using
// the old queue and starts using the new. it could be concurrently halfway-through inserting items
// now, while the following consume call fails meanwhile.
// thus, it is not safe yet to delete the old queue.
// we know however, that it will take at most one call to produce() after we called grow()
// before the producer thread starts using the new queue.
if (old->consume(data))
{
return true;
}
else
{
delete old;
old = nullptr;
}
}
if (current.load()->consume(data))
{
return true;
}
return false;
}
// consumer only as well
void grow()
{
old = current.load();
current.store(new qimlp());
}
private:
class qimpl
{
public:
bool produce(const T & data);
bool consume(const T & data);
};
std::atomic<qimpl *> current;
qimpl * old;
};
注意ATOMIC_POINTER_LOCK_FREE == 2是代码编译的条件。我看到的唯一可证明的条件是,如果调用grow(),下一次调用produce()将使用新的内部队列。因此,如果每次调用都增加produce中的原子计数,那么在N + 1处删除旧队列是安全的,其中N是grow()调用时的计数。然而,问题是,您需要自动交换新指针并存储计数,这似乎是不可能的。
欢迎大家有任何想法,作为参考,以下是系统的工作原理:
queue<int> q;
void consumer()
{
while (true)
{
int data;
if (q.consume(data))
{
// ..
}
}
}
void producer()
{
while (true)
{
q.produce(std::rand());
}
}
int main()
{
std::thread p(producer); std::thread c(consumer);
p.detach(); c.detach();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
编辑:好了,现在问题解决了。我突然意识到,当一个项目被推到新队列中时,旧队列显然已经过时了。因此,代码片段现在看起来像这样:
bool pop(T & data)
{
if (old)
{
if (old->consume(data))
{
return true;
}
}
// note that if the old queue is empty, and the new has an enqueued element, we can conclusively
// prove that it is safe to delete the old queue since it is (a) empty and (b) the thread state
// for the producer is updated such that it uses all the new entities and will never use the old again.
// if we successfully dequeue an element, we can delete the old (if it exists).
if (current.load()->consume(data))
{
if (old)
{
delete old;
old = nullptr;
}
return true;
}
return false;
}
我不完全理解grow()
在你的算法中的使用,但似乎你需要某种Read-Copy-Update (RCU)机制来安全删除不需要的队列。
这篇文章描述了与Linux相关的这种机制的不同风格,但你可以谷歌RCU风格,适合其他平台。