这里有一个基于基于时代的 Crossbeam 回收的无锁堆栈。
我添加了一些注释来帮助我理解此实现。
#[derive(Debug)]
pub struct TreiberStack<T> {
head: Atomic<Node<T>>,
}
#[derive(Debug)]
struct Node<T> {
data: ManuallyDrop<T>,
next: Atomic<Node<T>>,
}
impl<T> TreiberStack<T> {
pub fn new() -> TreiberStack<T> {
TreiberStack {
head: Atomic::null(),
}
}
pub fn push(&self, t: T) {
let mut n = Owned::new(Node {
data: ManuallyDrop::new(t),
next: Atomic::null(),
});
let guard = epoch::pin();
loop {
let head = self.head.load(Relaxed, &guard); // (1) ’Relaxed’ only provides atomicity
n.next.store(head, Relaxed); // (2) ‘Relaxed’ only provides atomicity
// (2) uses ‘head’ from (1). so (2) and (1)’s partial order won’t be rotated by CPU and compiler
// it seems that compare_and_set behaves just like compare_and_swap.
// It guarantees that all
// memory operations before the RELEASE operation will appear to happen
// before the RELEASE operation with respect to the other components of the system.
// So (1) and (2) always execute before (3)
match self.head.compare_and_set(head, n, Release, &guard) { // (3)
Ok(_) => break,
Err(e) => n = e.new,
}
}
}
pub fn pop(&self) -> Option<T> {
let guard = epoch::pin();
loop {
// It guarantees that all memory
// operations after the ACQUIRE operation will appear to happen after the
// ACQUIRE operation with respect to the other components of the system.
let head = self.head.load(Acquire, &guard); // (4)
match unsafe { head.as_ref() } {
Some(h) => {
let next = h.next.load(Relaxed, &guard); // (5) ’Relaxed’ only provides atomicity
if self
.head
.compare_and_set(head, next, Release, &guard)
.is_ok() // (6)
// This RELEASE matches the ACQUIRE in (4). Code between them won’t be reordered by CPU and compiler
{
unsafe {
guard.defer_destroy(head);
return Some(ManuallyDrop::into_inner(ptr::read(&(*h).data)));
}
}
}
None => return None,
}
}
}
// Returns `true` if the stack is empty.
pub fn is_empty(&self) -> bool {
let guard = epoch::pin();
self.head.load(Acquire, &guard).is_null() // (7)
}
}
impl<T> Drop for TreiberStack<T> {
fn drop(&mut self) {
while self.pop().is_some() {}
}
}
我的问题是:我可以用"放松"替换 (7( 的Acquire
吗?似乎 (7( 处的原子性足以使其工作。Acquire
通常与Release
配对以提供Visibility
:
后 给定变量上的 ACQUIRE,所有内存访问先于任何先前 保证同一变量上的 RELEASE 是可见的。 在其他 单词,在给定变量的关键部分中,所有访问的所有 该变量的先前关键部分保证具有 完成。
Visibility
在此代码中扮演什么角色?看起来代码的排序和原子性是我使这段代码工作所需的唯一保证。如果没有Visibility
另一个线程最终将看到store
的结果。所以代码仍然可以正常工作。
我主要从 Linux 内核的文档中学习了无锁
我的问题是:我可以用"放松"替换 (7( 的获取吗?似乎 (7( 处的原子性足以使其工作。
假设我们使用Relaxed
而不是建议的Acquire
。(3( 处的Release
不保证is_empty()
会看到调用push()
的线程的内存访问。对于使用 Treiber 堆栈的使用者线程来说,这可能变得尤为重要,例如
if !my_stack.is_empty() {
let elem = my_stack.pop().unwrap();
// Do things with elem...
} else {
// Do something expensive...
}
[...]另一个线程最终将看到存储的结果。
你是对的。但是,我想尽量减少我做昂贵操作的时间。在上面的代码片段中,我希望is_empty()
和pop()
始终指示我的堆栈处于相同状态。如果方法使用不同的原子排序,我们将失去这种保证,并且我们最终可能会因为is_empty()
尚未同步而完成更昂贵的工作。
此外,Treiber的无锁堆栈可用于多消费者,多生产者的上下文。假设一个线程正在调用push()
,另一个线程正在调用pop()
,第三个线程通过is_empty()
观察堆栈的状态。如果没有Acquire
内存顺序,第三个线程无法保证观察到推送或弹出线程观察到的相同状态。