我正在阅读ANTHONY WILLIAM的 C 并发。第7章介绍了开发无锁堆栈的过程,并说明了使无锁编程变得困难的常见问题。具体而言,第7.2.3节(检测无法使用危险指针无法回收的节点)描述了如何使用危险指针来避免数据竞赛,并确保其他线程不在由另一个线程引用。
此代码是该章中说明的pop()
的迭代之一:
std::shared_ptr<T> pop()
{
std::atomic<void*>& hp = get_hazard_pointer_for_current_thread();
node* old_head = head.load();
do
{
node* temp;
do
{
temp = old_head;
hp.store(old_head);
old_head = head.load();
} while(old_head != temp);
}
while(old_head &&
!head.compare_exchange_strong(old_head,old_head->next));
hp.store(nullptr);
std::shared_ptr<T> res;
if(old_head)
{
res.swap(old_head->data);
if(outstanding_hazard_pointers_for(old_head))
{
reclaim_later(old_head);
}
else
{
delete old_head;
}
delete_nodes_with_no_hazards();
}
return res;
}
我对这个片段有疑问:
if(outstanding_hazard_pointers_for(old_head))
{
reclaim_later(old_head);
}
else
{
delete old_head;
}
危险指针的目的是确保在没有其他线程仍在使用它时删除old_head
。outstanding_hazard_pointers_for
的建议实现如下:
unsigned const max_hazard_pointers=100;
struct hazard_pointer
{
std::atomic<std::thread::id> id;
std::atomic<void*> pointer;
};
hazard_pointer hazard_pointers[max_hazard_pointers];
bool outstanding_hazard_pointers_for(void* p)
{
for(unsigned i=0; i < max_hazard_pointers; ++i)
{
if(hazard_pointers[i].pointer.load() == p)
{
return true;
}
}
return false;
}
基本上,扫描了一系列危险指针,以检查是否存在通往节点的指针。我想知道为什么这项操作确实是安全的。执行原子load()
,即使使用顺序一致的排序,load()
也可能会加载一个陈旧的值。结果,可能找不到p
,pop()
将删除仍在使用的节点。
想象以下发生的情况:
线程A开始执行
pop()
,并在执行之前被抢占 Just :while(old_head && !head.compare_exchange_strong(old_head,old_head->next));
螺纹A因此将当前头视为
delete
0,将其保存到危险指针中。当线程醒来并试图弹出调用head.compare_exchange_strong(old_head, old_head->next)
。线程B开始调用
pop()
降低到if(outstanding_hazard_pointers_for(old_head))
old_head
将是堆栈的当前头,那是线程a引用为old_head
的节点。线程b将不是delete old_head
iff aload()
在线程A危险指针上返回线程A。
的最新值
基本上:我想知道线程B是否可以load()
陈旧值而不是最新值。换句话说,我不确定为什么有返回线程A设置的值(old_node
)。
这种推理的缺陷在哪里?我找不到关于为什么hp.store(old_head)
在hazard_pointers[i].pointer.load()
之前发生的CC_20的理由。
我要回答自己的问题,原因有两个:我认为我接受的答案不是很清楚,而JJ15K的评论证实了印象。
基本上,关键是要观察到,要通过if(outstanding_hazard_pointers_for(old_head))
和看到另一个线程,看到另一个线程在执行while(old_head && !head.compare_exchange_strong(old_head, old_head->next))
之前先抢占的old_head
,它必须使用相同的old_head
执行head.compare_exchange_strong(old_head, old_head->next)
。但是(假设<
表明在关系之前发生):
thread A: hp.store(old_head) <
thread A: old_head = head.load() <
thread B: head.compare_exchange_strong(old_head, old_head->next)
请记住,线程B正在看到我们在第一个指令中加载的相同 old_head
,并且将其值交换为 old_head->next
。我们仍然在 head.load()
中看到相同的值,这就是为什么螺纹a hp.store(old_head)
发生在线程b compare_exchange_strong
之前。
因此,即将检查危险指针中包含的头的线程可以删除以查看old_head
。还要注意old_head = head.load()
扮演的基本角色(以及包含一见一见钟情的陈述的循环)。没有该load
操作,old_head
的store
与hp
和compare_exchange_strong
之间的CC_36之间不会发生。
我希望这可以回答您的问题。
我对代码的理解如下。
如果在此线程中调用hazard_pointers[i].pointer.load()
之前,另一个线程中的hp.store(old_head)
在其他线程中不发生,则意味着此线程成功执行了head.compare_exchange_strong(old_head,old_head->next)
调用。这意味着对于另一个线程old_head != temp
,因此它将执行另一种尝试将适当的old_head
作为线程hp
的尝试。
这意味着可以安全删除当前线程中的原始old_head
指针,因为它实际上不是由另一个线程使用的。