锁定免费的C++数据结构,不可能



我真的不明白如何让一些数据结构无锁。例如,如果你有一个链表,那么要么用互斥体包围操作,要么如果另一个线程在你忙于将新节点重新链接在一起时执行,你可能会出现竞争条件。

"无锁"的概念(我理解它不是指"没有锁",而是指线程可以在不等待其他线程完成的情况下进行)根本没有意义。

有人能给我看一个使用堆栈、队列或链表等的简单例子吗?它被实现为"无锁",因为我不明白如何在不干扰其他线程生产力的情况下防止竞争条件?这两个目标肯定是矛盾的吗?

无锁数据结构使用原子操作,可能会带来额外的要求。例如,数据结构可能只对一个读写器线程或任何其他组合是安全的。在简单链表的情况下,将使用对节点指针的原子读取和写入,以确保多个线程可以同时安全地对其进行读取和写入。

你可能会也可能不会逃脱惩罚。如果您需要对数据结构和验证的内容提供额外的保证,那么如果没有某种形式的高级锁定,您可能无法做到这一点。此外,并不是每个数据结构都允许重写为无锁,即使考虑到对如何使用数据结构的额外要求也是如此。在这种情况下,不可变对象可能是一种解决方案,但由于复制,它们通常会带来性能损失,这并不总是希望锁定对象然后对其进行更改

我发现简单且可解释的是,首先可以为基于锁(互斥)的数据结构编写伪代码,然后尝试查看如何使用CAS操作以无锁的方式修改您持有锁的变量。尽管其他人给出了很好的答案,但我想补充一点,只有当你自己实现时,你才能感受到它,当然,通过阅读它发表在某篇研究论文中的一些伪代码。

下面是我在C++中实现的一个队列,它对多线程运行进行了验证测试:

#include<iostream>
#include<atomic>
#include<thread>
#include<vector>
#define N 1000
using namespace std;
class lf_queue
{
private:
struct node
{   int data;
atomic<node*> next;
node(int d):data(d)
{}
};
atomic<node*> Head;
atomic<node*> Tail;
public:
lf_queue()
{
node *nnode= new node(-1);
nnode->next=NULL;
Head=nnode;
Tail=nnode;
}
void enqueue(int data)
{
node *nnode= new node(data);
nnode->next=NULL;
node *tail,*next_p;
while(true)
{
tail=Tail.load();
next_p=tail->next;
if(tail==Tail.load())
{
if(next_p==NULL)
{
if((tail->next).compare_exchange_weak(next_p,nnode))
break;
}
else
{
Tail.compare_exchange_weak(tail,next_p);
}
}
}
Tail.compare_exchange_weak(tail,nnode);
}
bool dequeue(int &res)
{
while(true)
{
node *head,*tail,*next_p;
head=Head.load();
tail=Tail.load();
next_p=head->next;
if(head==Head.load())
{
if(head==tail)
{
if(next_p==NULL)
return false;
Tail.compare_exchange_weak(tail,next_p);
}
else
{
res=next_p->data;
if(Head.compare_exchange_weak(head,next_p))
break;
}
}
}//end loop
return true;
}
};
void producer(lf_queue &q)
{   //cout<<this_thread::get_id()<<"Inside producern";
for(int i=0;i<N;i++)
{
q.enqueue(1);
}
//cout<<this_thread::get_id()<<" "<<"Finished producingn";
}
void consumer(lf_queue &q,atomic<int>& sum)
{   //cout<<this_thread::get_id()<<" "<<"Inside consumern";
for(int i=0;i<N;i++)
{
int res=0;
while(!q.dequeue(res));
sum+=res;
}
//cout<<this_thread::get_id()<<" "<<"Finished consumingn";
}
int main()
{
lf_queue Q;
atomic<int> sum;
sum.store(0);
vector<thread> thread_pool;
for(int i=0;i<10;i++)
{   if(i%2==0)
{   thread t(consumer,ref(Q),ref(sum));
thread_pool.push_back(move(t));
}
else
{
thread t(producer,ref(Q));
thread_pool.push_back(move(t));    
}
}
for(int i=0;i<thread_pool.size();i++)
thread_pool[i].join();
cout<<"Final sum "<<sum.load()<<"n";
return 0;
}

我尝试使用Harris的论文实现无锁链表,但遇到了复杂的问题,你可以看到C++11风格,你只能在atomic<gt;类型以及这些原子<节点*>不能为了比特窃取的目的而被强制转换为long,Harris的实现使用比特窃取来逻辑标记已删除的节点。然而,互联网上有一些C语言的代码实现,它们使用低级别的cas_ptr操作,这为在地址和long之间转换提供了更大的灵活性。

有不同的基元可用,允许构建这样的无锁数据结构。例如,compare-and-swap(简称CAS)以原子方式执行以下代码:

CAS(x, o, n)
if x == o:
x = n
return o
else:
return x

通过此操作,您可以执行原子更新。例如,考虑一个非常简单的链表,它按排序顺序存储元素,允许您插入新元素并检查元素是否已经存在。find操作将像以前一样工作:它将遍历所有链接,直到找到元素或找到比查询更大的元素。插入时需要更加小心。它的工作原理如下:

insert(lst, x)
xn = new-node(x)
n = lst.head
while True:
n = find-before(n, x)
xn.next = next = n.next
if CAS(n.next, next, x) == next:
break

CCD_ 1只是在顺序中查找在CCD_ 2之前的元素。当然,这只是一个草图。一旦你想支持删除,事情就会变得更加复杂。我推荐Herlihy和Shavit的《多处理器编程的艺术》。我还应该指出,切换实现相同模型的数据结构,使其无锁定,通常是有利的。例如,如果您想实现等效的std::map,那么使用红黑树会很痛苦,但跳过列表更易于管理。

无锁结构使用原子指令来获取资源的所有权。原子指令锁定它在CPU缓存级别工作的变量,这可以确保其他内核不会干扰操作。

假设你有这些原子指令:

  • 读取(A)->A
  • compare_and_swap(A,B,C)->oldA=A;如果(A==B){A=C};返回oldA

使用这些指令,您可以简单地创建一个堆栈:

template<typename T, size_t SIZE>
struct LocklessStack
{
public:
LocklessStack() : top(0)
{
}
void push(const T& a)
{
int slot;
do
{
do
{
slot = read(top);
if (slot == SIZE)
{
throw StackOverflow();
}
}while(compare_and_swap(top, slot, slot+1) == slot);
// NOTE: If this thread stop here. Another thread pop and push
//       a value, this thread will overwrite that value [ABA Problem].
//       This solution is for illustrative porpoise only
data[slot] = a;
}while( compare_and_swap(top, slot, slot+1) == slot );
}
T pop()
{
int slot;
T temp;
do
{
slot = read(top);
if (slot == 0)
{
throw StackUnderflow();
}
temp = data[slot-1];
}while(compare_and_swap(top, slot, slot-1) == slot);
return temp;
}
private:
volatile int top;
T data[SIZE];
};

volatile是必需的,所以编译器在优化过程中不会打乱操作顺序。同时发生两次推送:

第一个进入while循环和读取槽,然后第二个推送到达,读取顶部,比较和交换(CAS)成功并递增顶部。另一个线程唤醒,CAS失败并读取另一个时间顶部。。

同时发生两次弹出:

与之前的情况非常相似。还需要读取值。

一次弹出和推送同时发生:

pop读取顶部,读取temp…push输入并修改顶部,然后推送一个新值。Pop CAS失败,Pop()将再次执行循环并读取新值

按下读取顶部并获取一个插槽,弹出输入并修改顶部值。推CAS失败,不得不再次循环推低指数。

显然,在并发环境中这不是真的

stack.push(A);
B = stack.pop();
assert(A == B); // may fail

因为推是原子的,而弹出是原子的——它们的组合不是原子的。

游戏编程gem6的第一章是一个很好的参考。

请注意,代码没有经过测试,原子可能真的很糟糕。

假设一个简单的操作,将变量递增一。如果您使用"将变量从内存读取到cpu,将1添加到cpu寄存器,将变量写回"来实现这一点,那么您必须在整个过程中放置某种互斥,因为您希望确保第二个线程在第一个线程写回变量后的之前不会读取变量。

如果您的处理器具有原子"增量内存位置"汇编指令,则不需要锁。

或者,假设您想将一个元素插入到链表中,这意味着您需要使起始指针指向新元素,然后使新元素指向上一个第一个元素。使用原子"交换两个内存单元"操作,可以将当前开始指针写入新元素的"下一个"指针,然后交换两个指针——现在,根据哪个线程首先运行,元素在列表中的顺序不同,但列表数据结构保持不变。

基本上,它总是关于"在一个原子操作中同时做几件事,这样你就不能把操作分解成可能不会中断的单个部分"。

您对锁自由度的定义是错误的。

锁定自由度允许单个线程挨饿,但保证了系统范围的吞吐量。如果一个算法满足当程序线程运行足够长时,至少有一个线程取得了进展(对于一些合理的进展定义),那么它就是无锁的https://en.wikipedia.org/wiki/Non-blocking_algorithm

这意味着,在多个线程访问数据结构的情况下,只有1个线程将被授予;其余的将无法通过

关于锁自由度,重要的是内存冲突的概率。使用锁保护的数据结构通常比使用原子变量的实现更快,但它不能很好地扩展,碰撞的可能性很小。

示例:多个线程不断地在列表中推送数据。这将导致许多冲突,而经典的互斥锁是可以的。但是,如果有1个线程将数据推到列表的末尾,而1个线程在前面弹出数据,情况就不同了。如果列表不为空,那么push_back()和pop_front()不会发生冲突(取决于实现),因为它们不适用于同一对象。但是仍然有一个空列表的更改,所以您仍然需要确保访问安全。在这种情况下,锁定自由将是更好的解决方案,因为您可以同时调用这两个函数,而无需等待。

简而言之:无锁是为大型数据结构设计的,在大型数据结构中,多个写入程序大多是分离的,很少发生冲突。

不久前,我尝试自己实现一个无锁列表容器。。。https://codereview.stackexchange.com/questions/123201/lock-free-list-in-c

这里有一个非常基本的(push_front)无锁列表:

template <class T>
class LockFreeList {    
public:
struct Node {
T value_;
Node* next_;
Node(const T& value) : value_(value), next_(nullptr) {
}
};

void push_front(const T& value) {
Node* new_node = new Node(value);
Node* old_head = head_;
do { new_node->next_ = old_head; }
while (!head_.compare_exchange_weak(old_head, new_node));
}

private:
std::atomic<Node*> head_;
}; 

受Fedor Pikus的CppCon 2017演讲启发,请参阅:https://youtu.be/ZQFzMfHIxng?t=2432

有一个小改动:push_back使用compare_exchange_weak。

最新更新