解决:锁定自由队列多生产者多消费者-内存损坏



LockFreeQueueMPMC应该在没有锁的情况下解决MPMC问题,但在运行时内存损坏。LockFreeDispatchStackMPMC在没有锁的情况下解决了MPMC问题,并用作LockFreeCacheMPMC分配器的基础。这两种实现都通过了压力测试。

LockFreeQueueMPMC Enqueue执行与LockFreeDispatchStackMPMC Send相同的操作。这将向列表中添加一个新节点。排队操作更为复杂。一次只能有一个指针是cmpexg,因此没有使用Tail指针的解决方案。为了从列表中删除节点,需要遍历列表并删除最后一个节点。这会将出列时间从O(1(更改为O(N(,但无锁定。

LockFreeDispatchStackMPMC是一个无序的MPMC无锁解决方案。最早到达的消息将首先得到服务。这使用堆栈而不是队列,对于某些问题,这是不可接受的,因为必须对消息进行排序。这表明,如果您的消息可以是无序的,那么与Queue相比,性能提高了40%以上。

template<class T>
struct Node
{
std::atomic<int> Next;
T *Data;
};
template<class T>
class LockFreeDispatchStackMPMC
{
public:
LockFreeDispatchStackMPMC()
{
Head = NULL;
}
~LockFreeDispatchStackMPMC(){
}
void Send(T *item)
{
Node<T> * new_node = Cache.Malloc();
new_node->Data=item;
bool done = false;
while(!done)
{
auto head = Head.load();
new_node->Next.store( head);
if( Head.compare_exchange_weak(head,new_node))
{
done = true;
}
}
}
T *Recieve()
{
T *returnValue = NULL;
bool done = false;
while(!done)
{
auto head = Head.load();
if(head == NULL)
{
done=true;
}
else
{
Node<T> * curr = head;
Node<T> *next = curr->Next.load();
if(Head.compare_exchange_weak(head,next))
{
done = true;
returnValue = curr->Data;
curr->Next =NULL;
Cache.Free(curr);
}
}
}
return returnValue;
}
public:
std::atomic<Node<T> *> Head;
private:
LockFreeMemCache<Node<T> > Cache;
};

这是一个基于使用两个列表的对象缓存容器,可用作存储对象的跨线程池。这也允许从墓地读取,因为对象没有被销毁,不应该允许写入。这是队列算法中的一个问题。这也是MPMC安全的,即使每个节点必须一次分配一个。

#define GROW_BY_SIZE 4
template<class T>
class LockFreeCacheMPMC
{
public:
LockFreeCacheMPMC()
{
Head=NULL;
FreeStack=NULL;
AddSomeCache();
}
~LockFreeCacheMPMC()
{
Node<T> *node ,*prev;
bool done = false;
node = Head;
prev = NULL;
while(!done)
{
prev = node;
if(node == NULL)
{
done = true;
}
else
{
node = node->Next.load();
delete prev->Data;
delete prev;
}
}
done = false;
node = FreeStack;
prev = NULL;
while(!done)
{
prev = node;
if(node == NULL)
{
done = true;
}
else
{
node = node->Next.load();
delete prev;
}
}
}
T *Malloc()
{
T *returnValue = NULL;
returnValue=Pop();
while(returnValue==NULL)
{
AddSomeCache();
returnValue=Pop();
}
return returnValue;
}
void Free(T *ptr)
{
Push(ptr);
}
private:
void AddSomeCache()
{
for(int i=0; i < GROW_BY_SIZE; i++)
{
T *tmp = new T();
Push(tmp);
}
}
private:
void Push(T *item)
{
Node<T> * new_node = PopNode(true);
new_node->Data=item;
bool done = false;
while(!done)
{
Node<T>* head = Head.load();
new_node->Next.store(head);
if(Head.compare_exchange_weak(head,new_node))
{
done = true;
}
}
}
T *Pop()
{
T *returnValue = NULL;
bool done = false;
while(!done)
{
Node<T> * curr= Head.load();
if(curr == NULL)
{
done=true;
}
else
{
Node<T> *next = curr->Next.load();
if(Head.compare_exchange_weak(curr,next))
{
done = true;
returnValue = curr->Data;
PushNode(curr);
}
}
}
return returnValue;
}
void PushNode(Node<T> *item)
{
item->Next = NULL;
item->Data = NULL;
bool done = false;
while(!done)
{
Node<T>* fs = FreeStack.load();
item->Next.store(fs);
if(FreeStack.compare_exchange_weak(fs,item))
{
done = true;
}
}
}
Node<T> *PopNode(bool Alloc)
{
Node<T> *returnValue = NULL;
bool done = false;
while(!done)
{
Node<T> *fs = FreeStack.load();
if(fs == NULL)
{
done=true;
}
else
{
Node<T> *next = fs->Next.load();
if(FreeStack.compare_exchange_weak(fs,next))
{
done = true;
returnValue = fs;
}
}
}
if ((returnValue == NULL) &&Alloc )
{
returnValue =new Node<T>();
returnValue->Data = NULL;
returnValue->Next = NULL;
}
return returnValue;
}
std::atomic<Node<T> *> Head;
std::atomic<Node<T> *>FreeStack;
};

这是问题类。它会运行一段时间,但腐败会发生。问题出在Dequeue方法中。节点会从列表中一次修剪一个。节点有可能在每一步都从你下面修剪掉。这会导致节点被修剪掉,需要"删除",但仍有活动线程从节点中读取。该算法应该防止对死节点的任何写入,因为下一个指针在原子上指向节点或为null,但使用缓存池存储节点可以安全地从墓地读取。

template<class T>
class LockFreeQueueMPMC
{
public:
LockFreeQueueMPMC()
{
Head=NULL;
}
~LockFreeQueueMPMC(){
}
void Enqueue(T *item)
{
Node<T> * new_node = Cache.Malloc();
new_node->Data=item;
bool done = false;
while(!done)
{
auto head = Head.load();
new_node->Next.store(head);
if(Head.compare_exchange_weak(head,new_node))
{
done = true;
}
}
}
T *Dequeue()
{
T *returnValue=NULL;
bool done = false;
while(!done)
{
Node<T> *head = Head.load();
if(head == NULL)
{
done = true;
}
else
{
Node<T> * prev, *curr;
prev = NULL;
curr = head;
bool found = false;
while(!found)
{
if(curr == NULL)
{
break;
}
Node<T> * next = curr->Next.load();
if(next == NULL)
{
found=true;
break;
}
prev = curr;
curr = next;
}
if(found)
{
if(prev == NULL)
{
if(Head.compare_exchange_weak(head,NULL))
{
done = true;
}
}
else
{
if(prev->Next.compare_exchange_weak(curr,NULL))
{
done = true;
}
}
if(done)
{
returnValue = curr->Data;
Cache.Free(curr);
}
}
}
}
return returnValue;
}
private:
std::atomic<Node<T> *> Head;
LockFreeMemCache<Node<T> > Cache;
};

问题出现在Dequeue方法中,有一个步骤会损坏,但很少。

我建议您使用工具来帮助您,因为这不是一个简单易懂的代码。我不知道你使用的编译器,因为我是一名Linux开发人员,所以我可以建议以gcc为例——可以使用Thread消毒程序。这是相当快的,有可能你能够复制并赶上比赛条件:

如何使用gcc v4.8.1的线程消毒剂?

最新更新