线程安全的无锁内存池:free函数在多线程中不能正常工作



我有一个相同大小缓冲区的线程安全分配器的简单实现。在内部,实现是一个非常简单的互锁单链表,它利用未分配缓冲区中未使用的空间来维护一个单链表。

还写了一些测试,在单线程模式下测试代码——一切似乎都没问题。设法将问题隔离到Free函数,但我似乎就是找不到它。

我必须提到,我使用微软的互锁单链表对完全相同的代码进行了一些测试,显然它有效,但我仍然想找出我的实现的问题。甚至尝试反汇编代码并应用类似的内在特性,但它没有帮助(还要注意,我不需要跟踪列表条目的数量,所以这就是为什么我不需要interlock函数来交换双寄存器大小的元素,如InterlockedCompareExchange128 for x64)

下面是分配器的代码:
#ifndef _POOLNOLOCK_HPP_
#define _POOLNOLOCK_HPP_
#include <windows.h>
template<size_t TSizeOfElem>
class PoolNoLock {
public:
PoolNoLock(size_t N) :
n(N),
arr(new ELEMENT[n])
{
for (size_t i = 0; (n - 1) > i; ++i)
{
arr[i].next = &arr[i + 1];
}
arr[n - 1].next = nullptr;
for (size_t i = 0; n > i; ++i)
{
arr[i].allocated = false;
}
}
~PoolNoLock() { delete[] arr; }
void *Alloc()
{
ELEMENT *allocBuff;
do
{
allocBuff = ptrFree;
if (!allocBuff)
{
return nullptr;
}
} while (allocBuff != InterlockedCompareExchangePointer(
reinterpret_cast<void *volatile *>(&ptrFree),
allocBuff->next,
allocBuff
));
if (allocBuff->allocated)
{
__debugbreak(); //will break here
}
allocBuff->allocated = true;
return &allocBuff->buff;
}
void Free(void *Address)
{
ELEMENT *const freeBuff = reinterpret_cast<ELEMENT *>(Address);
if (!freeBuff->allocated)
{
__debugbreak();
}
freeBuff->allocated = false;
ELEMENT *cmpFree = ptrFree;
do
{
freeBuff->next = cmpFree;
ELEMENT *const xchgFree =
reinterpret_cast<ELEMENT *>(InterlockedCompareExchangePointer(
reinterpret_cast<void *volatile *>(&ptrFree),
freeBuff,
cmpFree
));
if (xchgFree == cmpFree)
{
break;
}
cmpFree = xchgFree;
} while (true);
}
private:
typedef struct _ELEMENT {
union {
_ELEMENT *next;
unsigned char buff[TSizeOfElem];
};
bool allocated; //debug info
}ELEMENT;
const size_t n;
ELEMENT *const arr; //array of list elements
ELEMENT *volatile ptrFree = &arr[0]; //head of "singly" linked list
};
#endif // _POOLNOLOCK_HPP_
下面是我用来对类进行压力测试的代码:
  1. 64是WaitForMultipleObjects可以等待的最大对象数
  2. 需要线程中的等待来帮助实现一个场景,其中尽可能多的线程正在访问资源
  3. 生成的线程数恰好等于分配器中的元素数,这就是为什么只分配测试有效的原因
#include "PoolNoLock.hpp"
#include <vector>
#include <map>
#include <iostream>
static constexpr size_t N_THREAD = 64;
static constexpr size_t N_TEST_RUN = 4;
static constexpr size_t N_ALLOC_FREE = 1024;
struct ThreadParam {
PoolNoLock<sizeof(size_t)> *allocator;
const HANDLE &hStartEvent;
void *addressAlloc = nullptr;
ThreadParam(PoolNoLock<sizeof(size_t)> *Allocator, const HANDLE &StartEvent) :
allocator(Allocator),
hStartEvent(StartEvent)
{};
};
template<bool TAllocOnly>
class Test {
public:
~Test()
{
CloseHandle(hStartEvent);
}
bool RunSingle(PoolNoLock<sizeof(size_t)> *Allocator)
{
std::vector<ThreadParam> params(N_THREAD, ThreadParam(Allocator, hStartEvent));
if (TRUE != ResetEvent(hStartEvent))
{
return false;
}
for (size_t i = 0; N_THREAD != i; ++i)
{
handles[i] = CreateThread(nullptr,
0,
reinterpret_cast<PTHREAD_START_ROUTINE>(threadProc),
&params[i],
CREATE_SUSPENDED,
&tids[i]);
if (!handles[i])
{
return false;
}
}
for (HANDLE handle : handles)
{
if (1 != ResumeThread(handle))
{
return false;
}
}
if (TRUE != SetEvent(hStartEvent))
{
return false;
}
if ((WAIT_OBJECT_0 + N_THREAD - 1) < WaitForMultipleObjects(N_THREAD, handles, TRUE, INFINITE))
{
return false;
}
for (size_t i = 0; N_THREAD != i; ++i)
{
if (WAIT_OBJECT_0 != WaitForSingleObject(handles[i], 0))
{
return false;
}
DWORD exitCode;
if (TRUE != GetExitCodeThread(handles[i], &exitCode))
{
return false;
}
if (0 != exitCode)
{
return false;
}
if (TRUE != CloseHandle(handles[i]))
{
return false;
}
}
if (TAllocOnly)
{
std::map<void *, DWORD> threadAllocations;
for (size_t i = 0; N_THREAD != i; ++i)
{
if (!params[i].addressAlloc)
{
return false;
}
if (threadAllocations.end() == threadAllocations.find(params[i].addressAlloc))
{
return false;
}
std::pair<std::map<void *, DWORD>::iterator, bool> res =
threadAllocations.insert(std::make_pair(params[i].addressAlloc, tids[i]));
if (!res.second)
{
return false;
}
Allocator->Free(params[i].addressAlloc);
}
if (N_THREAD != threadAllocations.size())
{
return false;
}
}
return false;
}
bool RunMultiple()
{
for (size_t i = 0; N_TEST_RUN != i; ++i)
{
PoolNoLock<sizeof(size_t)> allocator(N_THREAD);
RunSingle(&allocator);
}
return true;
}
private:
const HANDLE hStartEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
HANDLE handles[N_THREAD] = { nullptr };
DWORD tids[N_THREAD] = { 0 };
static DWORD WINAPI ThreadProcAllocOnly(_In_ ThreadParam *Param)
{
if (WAIT_OBJECT_0 != WaitForSingleObject(Param->hStartEvent, INFINITE))
{
return 2;
}
Param->addressAlloc = Param->allocator->Alloc();
if (!Param->addressAlloc)
{
return 3;
}
return 0;
}
static DWORD WINAPI ThreadProcAllocFree(_In_ ThreadParam *Param)
{
if (WAIT_OBJECT_0 != WaitForSingleObject(Param->hStartEvent, INFINITE))
{
return 2;
}
for (size_t i = 0; N_ALLOC_FREE != i; ++i)
{
void *ptrTest = Param->allocator->Alloc();
if (!ptrTest)
{
return 3;
}
Param->allocator->Free(ptrTest);
}
return 0;
}
const LPTHREAD_START_ROUTINE threadProc =
TAllocOnly
? reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadProcAllocOnly)
: reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadProcAllocFree);
};
int main()
{
Test<true> testAllocOnly0;
Test<false> TestAllocFree0;
if (!testAllocOnly0.RunMultiple()) //this test will succeed
{
std::cout << "Test failed" << std::endl;
return 1;
}
std::cout << "Alloc-ONLY tests succeeded" << std::endl;
if (!TestAllocFree0.RunMultiple()) //this test will fail
{
std::cout << "Test failed" << std::endl;
return 1;
}
std::cout << "Alloc/free tests succeeded" << std::endl;
std::cout << "All tests succeeded" << std::endl;
return 0;
}

您在Alloc()例程中的错误。更准确的说是line

InterlockedCompareExchangePointer(
reinterpret_cast<void *volatile *>(&ptrFree),
allocBuff->next, // <-- !!!
allocBuff)

这里有2个操作:首先cpu从allocBuff指针读取allocBuff->next,然后在ptrFree上尝试CAS,但这2个操作不是原子的,可以在它之间中断。当你尝试使用allocBuff->next时,allocBuff可能已经被另一个线程分配了,next可能已经被覆盖到垃圾(例如,不是有效的指针)。

  • T#1从ptrFree读取allocBuff
  • ptrFree中读取allocBuff
  • T#2返回allocBuff给用户
  • t# 2覆盖allocBuff->next的上下文到用户数据,让为-1。
  • T#1读取next = allocBuff->next并获得一些用户数据(-1)
  • T#2 free/pushallocBuffback toptrFree
  • T#1在CAS中ok,因为ptrFree再次指向allocBuffptrFree
  • ptrFree读取-1

这里甚至只有一个元素(a)在堆栈和2个线程足够演示。让同样的例子:其中头(F)和元素(a)在堆栈中。初始状态:F→一个→0

  • t# 1 readaF
  • T#2 readaF
  • T#2 read0
  • T#2 write0F到并返回a给用户:F ->0
  • T#2 write-1:a = -1;
  • T#1 read-1
  • T#2 freea:F ->一个→0
  • T#1 write-1F到并返回a对用户:F ->1
  • T#2 read-1F

possible and another race here

F - a - b - c

和popa赋值Fb到. 但是在你做这个之前,另一个线程先弹出a,所以现在

F - b - c

then popb:

c

and push/freea:

F - a - c.

F因为指向a同样,CAS将会正常,并且您可以创建chain

F - b - trash

b因为


在任何情况下你的实现都远远不是最好的。这里不需要模板,TSizeOfElemn只需要知道init过程,不需要更多。对于压力测试不需要很多线程,但在临界点延迟

void NotifyAllocated(PSINGLE_LIST_ENTRY allocBuff)
{
allocBuff->Next = (PSINGLE_LIST_ENTRY)INVALID_HANDLE_VALUE; // allocated = true;

WCHAR sz[16], txt[64];
swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
swprintf_s(txt, _countof(txt), L"alocated = %p", allocBuff);

MessageBoxW(0, txt, sz, MB_ICONINFORMATION); // simulate delay !
}
void NotifyCheck(PVOID buf, PCWSTR fmt)
{
WCHAR sz[16], txt[64];
swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
swprintf_s(txt, _countof(txt), fmt, buf);
MessageBoxW(0, txt, sz, MB_ICONWARNING); // simulate delay !
}
class PoolNoLock 
{
PVOID _arr = 0;                     //array of list elements
PSINGLE_LIST_ENTRY _ptrFree = 0;    //head of "singly" linked list
public:
bool Init(size_t N, size_t SizeOfElem)
{
if (N)
{
if (SizeOfElem < sizeof(SINGLE_LIST_ENTRY))
{
SizeOfElem = sizeof(SINGLE_LIST_ENTRY);
}
union {
PUCHAR buf;
PSINGLE_LIST_ENTRY Next;
};
if (buf = new UCHAR[N * SizeOfElem])
{
_arr = buf;
PSINGLE_LIST_ENTRY ptrFree = 0;
do 
{
Next->Next = ptrFree;
ptrFree = Next;
} while (buf += SizeOfElem, --N);
_ptrFree = ptrFree;
}
return true;
}
return false;
}
~PoolNoLock() 
{ 
if (PVOID buf = _arr)
{
delete[] buf; 
}
}
void *Alloc()
{
PSINGLE_LIST_ENTRY allocBuff, ptrFree = _ptrFree;
for (;;)
{
allocBuff = ptrFree;
if (!allocBuff)
{
return 0;
}
NotifyCheck(allocBuff, L"try: %p");
// access allocBuff->Next !!
PSINGLE_LIST_ENTRY Next = allocBuff->Next;
NotifyCheck(Next, L"next: %p");
ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, Next, allocBuff);
//ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, allocBuff->Next, allocBuff);
if (ptrFree == allocBuff)
{
NotifyAllocated(allocBuff);
return allocBuff;
}
}
}
void Free(void *Address)
{
PSINGLE_LIST_ENTRY ptrFree = _ptrFree, newFree;
for ( ; ; ptrFree = newFree)
{
reinterpret_cast<PSINGLE_LIST_ENTRY>(Address)->Next = ptrFree;
newFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, Address, ptrFree);
if (newFree == ptrFree)
{
return ;
}
}
}
};
ULONG ThreadTest(PoolNoLock* p)
{
ULONG n = 2;
do 
{
WCHAR sz[16], txt[32];
swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
swprintf_s(txt, _countof(txt), L"loop %x", n);
MessageBoxW(0, txt, sz, MB_OK);
if (void* buf = p->Alloc())
{
p->Free(buf);
}
} while (--n);
return 0;
}
void DemoTest()
{
PoolNoLock p;
if (p.Init(1, sizeof(PVOID)))
{
ULONG n = 2;
do 
{
CloseHandle(CreateThread(0, 0, (PTHREAD_START_ROUTINE)ThreadTest, &p, 0, 0));
} while (--n);
}
MessageBoxW(0, 0, L"Wait", MB_OK);
}

这与您的代码相同,只是优化了。Bug相同- at

ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer(
(void**)&_ptrFree, allocBuff->Next, allocBuff);

用于测试和更好地理解-最好写成

PSINGLE_LIST_ENTRY Next = allocBuff->Next;
// delay !!
ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer(
(void**)&_ptrFree, Next, allocBuff);

用于解决SLIST_HEADER使用的这个问题-这实际上是指向堆栈顶部的指针+操作计数的组合。正确的实现可以是下一步(如果不直接使用SLIST_HEADER和api)

class PoolNoLock 
{
PVOID _arr = 0;                     //array of list elements
struct U  
{
PSINGLE_LIST_ENTRY ptr; //head of "singly" linked list
ULONG_PTR allocCount;
void operator = (U& v)
{
ptr = v.ptr;
allocCount = v.allocCount;
}
U(U* v)
{
ptr = v->ptr->Next;
allocCount = v->allocCount + 1;
}
U(PSINGLE_LIST_ENTRY ptr, ULONG_PTR allocCount) : ptr(ptr), allocCount(allocCount)
{
}
U() : ptr(0), allocCount(0)
{
}
} u;
//++ debug
LONG _allocMiss = 0;
LONG _freeMiss = 0;
//-- debug
public:
bool Init(size_t N, size_t SizeOfElem)
{
if (N)
{
if (SizeOfElem < sizeof(SINGLE_LIST_ENTRY))
{
SizeOfElem = sizeof(SINGLE_LIST_ENTRY);
}
union {
PUCHAR buf;
PSINGLE_LIST_ENTRY Next;
};
if (buf = new UCHAR[N * SizeOfElem])
{
_arr = buf;
PSINGLE_LIST_ENTRY ptrFree = 0;
do 
{
Next->Next = ptrFree;
ptrFree = Next;
} while (buf += SizeOfElem, --N);
u.ptr = ptrFree;
u.allocCount = 0;
}
return true;
}
return false;
}
~PoolNoLock() 
{ 
if (PVOID buf = _arr)
{
delete[] buf; 
}
}
void *Alloc()
{
for (;;)
{
U allocBuff = u;
if (!allocBuff.ptr)
{
return 0;
}
U Next(&allocBuff);
if (InterlockedCompareExchange128((LONG64*)&u, Next.allocCount, (LONG64)Next.ptr, (LONG64*)&allocBuff))
{
// for debug only
allocBuff.ptr->Next = (PSINGLE_LIST_ENTRY)INVALID_HANDLE_VALUE;
return allocBuff.ptr;
} 
// for debug only
InterlockedIncrementNoFence(&_allocMiss);
}
}
void Free(void *Address)
{
for ( ; ; )
{
U ptrFree = u;
U a(reinterpret_cast<PSINGLE_LIST_ENTRY>(Address), ptrFree.allocCount);

reinterpret_cast<PSINGLE_LIST_ENTRY>(Address)->Next = ptrFree.ptr;
if (InterlockedCompareExchange128((LONG64*)&u, a.allocCount, (LONG64)a.ptr, (LONG64*)&ptrFree))
{
return ;
}
// for debug only
InterlockedIncrementNoFence(&_freeMiss);
}
}
};
ULONG ThreadTest(PoolNoLock* p)
{
ULONG n = 0x10000;
do 
{
if (void* buf = p->Alloc())
{
p->Free(buf);
}
} while (--n);
return 0;
}
void DemoTest()
{
PoolNoLock p;
if (p.Init(16, sizeof(PVOID)))
{
ULONG n = 8;
do 
{
CloseHandle(CreateThread(0, 0, (PTHREAD_START_ROUTINE)ThreadTest, &p, 0, 0));
} while (--n);
}
MessageBoxW(0, 0, L"Wait", MB_OK);
}

最新更新