(在两个误解答案之后的澄清:如果生产者线程数小于堆栈大小,则代码运行良好。只有 1 个消费者发布槽。我用 32 个制作人与 16 个插槽调整这个演示的方式是快速触发一个糟糕的条件)
在对用于多线程缓冲区管理的无锁堆栈进行压力测试时,我发现缓冲区内容的完整性无法得到保证。我现在非常确定堆栈/后进先出解决方案不是最佳选择;但我仍然想了解这些缓冲区是如何受到损害的。
这个想法是:一个无锁堆栈,其中包含指向"免费"缓冲区的指针。它们可以由许多生产者线程中的一个检索。然后,缓冲区被填充数据并"调度"到单个使用者线程,最终将它们返回到堆栈。
观察结果是:- 两个线程以某种方式获得相同的缓冲区。- 一个线程正在获取一个缓冲区,其内存仍未从刚刚释放它的另一个线程刷新。
以下是我可以放在一起的最简单的示例,用于演示:
更新:我为任何想要使用它的人制作了一个具有更好调试输出的版本,在这里:https://ideone.com/v9VAqU
#include <atomic>
#include <assert.h>
#include <chrono>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
using namespace std;
#define N_SLOTS 16
#define N_THREADS 32
// The data buffers that are shared among threads
class Buffer { public: int data[N_THREADS] = {0}; } buffers[N_SLOTS];
// The lock-free stack under study
class LockFreeStack
{
Buffer* stack[N_SLOTS];
atomic_int free_slots, out_of_slots, retries;
public:
LockFreeStack() : free_slots(0), out_of_slots(0), retries(0) {
for (int i=0; i<N_SLOTS; i++)
release_buffer(&buffers[i]);
}
Buffer* get_buffer()
{
int slot = --free_slots;
if (slot < 0) {
out_of_slots++;
return nullptr;
}
/// [EDIT] CAN GET PREEMPTED RIGHT HERE, BREAKING ATOMICITY!
return stack[slot];
}
void release_buffer(Buffer* buf)
{
int slot;
while(true) {
slot = free_slots;
if (slot <= 0) {
stack[0] = buf;
free_slots = 1;
break;
}
stack[slot] = buf;
if (free_slots++ == slot)
break;
retries++;
}
}
ostream& toStream(ostream& oss) {
return oss << "LockFreeStack with free_slots=" << free_slots << ", oos=" << out_of_slots << ", retries=" << retries;
}
} lockFreeStack;
// Utility class to help with test
class PrintQueue {
queue<Buffer*> q;
mutex m;
public:
void add(Buffer* buf) {
lock_guard<mutex> lock(m);
q.push(buf);
}
Buffer* pop() {
lock_guard<mutex> lock(m);
Buffer* buf;
if (q.empty())
return nullptr;
buf = q.front();
q.pop();
return buf;
}
} printQueue;
int main()
{
vector<thread> workers;
for (int t = 0; t < N_THREADS; ++t) {
workers.push_back(thread([&,t] {
while(true) {
auto buf = lockFreeStack.get_buffer();
if (buf) {
buf->data[t] = t;
this_thread::sleep_for(chrono::milliseconds(10));
printQueue.add(buf);
}
}
}));
}
while(true) {
this_thread::sleep_for(chrono::milliseconds(10));
lockFreeStack.toStream(cout) << endl;
Buffer *buf;
while((buf = printQueue.pop())) {
cout << "Got Buffer " << buf << " #" << (buf-buffers) << " { ";
int used = 0;
for(int t=0; t<N_THREADS; t++)
if (buf->data[t]) {
used += 1;
cout << 't' << buf->data[t] << ' ';
buf->data[t] = 0;
}
cout << "}n";
assert (used == 1);
lockFreeStack.release_buffer(buf);
}
}
return 0;
}
还有一个错误输出的示例:
> LockFreeStack with free_slots=-2454858, oos=2454836, retries=0
> Got Buffer 0x604a40 #12 { t7 }
> Got Buffer 0x6049c0 #11 { t8 }
> Got Buffer 0x604b40 #14 { t1 }
> Got Buffer 0x604bc0 #15 { }
> test.cpp:111: int main(): Assertion `used == 1' failed.
我试过到处使用std::atomic_thread_fence()
,但没有区别。
故障在哪里?
(顺便说一句,在多个版本的GCC上测试,包括5.2和4.6)
你的LockFreeStack代码完全被破坏了。
同时从 2 个线程调用release_buffer
可以将 2 个指针粘在同一插槽中,因此一个指针丢失。
if (free_slots++ == slot)
只会对一个线程成功,因此另一个线程将再次尝试并将其指针放在另一个插槽中。但它也可能是在第一个插槽中获胜的那个,所以你在 2 个插槽中得到相同的结果。
您可以使用 1 个线程调用release_buffer
和另一个调用get_buffer
获得相同的效果。其中一个或两个方案负责您的损坏。
release_buffer
没有限制在stack
的大小,所以预计缓冲区溢出,然后所有地狱都会松动。
我建议:
-
release_buffer
首先原子地选择一个唯一的插槽,然后写入它。 -
当多个发布者竞争插槽时,插槽中指针的写入顺序不能保证,因此您需要一些其他方法来将插槽标记为
release_buffer
有效,在get_buffer
上标记为无效。最简单的方法是将其清空get_buffer
. -
绑定到堆栈大小的计数器。如果你不能做一个原子操作,拿一个副本,做所有的改变,然后把它放回来。
编辑
下面是将同一缓冲区返回到 2 个单元格的场景:
////T==0 free_slots==5
// thread 1
void release_buffer(Buffer* buf) ////T==1 buf==buffers[7]
{
int slot;
while(true) { //// 1st iteration
slot = free_slots; ////T==2 free_slots==5 slot==5
if (slot <= 0) {
stack[0] = buf;
free_slots = 1;
break;
} ////*** note other threads below ***
stack[slot] = buf; //// stack[5]==buffers[7]
if (free_slots++ == slot) ////T==5 free_slots==4 slot==5 ---> go for another round
break;
retries++;
}
while(true) { //// 2nd iteration
slot = free_slots; ////T==6 free_slots==4 slot==4
if (slot <= 0) {
stack[0] = buf;
free_slots = 1;
break;
}
stack[slot] = buf; //// stack[4]==buffers[7] //// BOOM!!!!
if (free_slots++ == slot) ////T==7 free_slots==5 slot==4 ---> no other round
break;
retries++;
}
}
// thread 2
Buffer* get_buffer() // thread
{
int slot = --free_slots; ////T==3 free_slots==4
if (slot < 0) {
out_of_slots++;
return nullptr;
}
return stack[slot];
}
// thread 3
Buffer* get_buffer()
{
int slot = --free_slots; ////T==4 free_slots==3
if (slot < 0) {
out_of_slots++;
return nullptr;
}
return stack[slot];
}
编辑 2:断言失败...
如果您现在还没有找到它,这里是:
//// producer t==0
buf->data[t] = t; //// buf->data[t] == 0
//consumer
for(int t=0; t<N_THREADS; t++) // first iteration, t==0
if (buf->data[t]) { //// buf->data[t] == 0, branch not taken
used += 1;
...
//// used remains ==0 -----> assert fails
在缓冲区中写入 t+1 将修复它。
感谢您的想法。我最终发现了问题:
- free_slots == N
- 来自消费线程 #2 的 get_buffer() 占用插槽 N-1,但在读取堆栈中的指针之前被抢占[N-1]
- release_buffer(buf)(单个生产者线程 #1)将新的缓冲区放在理论上空出的插槽上,精确堆叠[N-1]!
现在,最初在 stack[N-1] 中的指针已经丢失(内存泄漏),如果 get_buffer() 的下一个线程大约在同一时间唤醒,它将与线程 #2 相同。
我在Linux上使用gcc 5.3编译并执行了以下内容:
#include <atomic>
#include <iostream>
int main()
{
for (int i=0; i<5; ++i)
{
std::atomic_int n;
std::cout << n << std::endl;
n=4;
}
return 0;
}
结果输出如下:
306406976
4
4
4
4
由此,我得出结论,std::atomic_int
的构造函数并没有明确清除原子整数的初始值。必须显式初始化它。我想验证这个事实,因为我对原子库不是很熟悉。我的结果表明,std::atomic_int
必须显式初始化,它们不会自动初始化为 0。
系统提示我根据以下观察结果验证std::atomic_int
是否正在初始化:
在这里,
LockFreeStack
构造函数也不会显式初始化std::atomic_int
类成员。构造函数调用
release_buffer()
方法。release_buffer
() 方法读取并使用free_slots
。
由此,我必须得出结论,这是未定义的行为。