无锁堆栈,将 is_empty() 中的获取替换为放松



这里有一个基于基于时代的 Crossbeam 回收的无锁堆栈。

我添加了一些注释来帮助我理解此实现。

#[derive(Debug)] 
pub struct TreiberStack<T> { 
    head: Atomic<Node<T>>, 
} 

#[derive(Debug)] 
struct Node<T> { 
    data: ManuallyDrop<T>, 
    next: Atomic<Node<T>>, 
} 

impl<T> TreiberStack<T> { 
    pub fn new() -> TreiberStack<T> { 
        TreiberStack { 
            head: Atomic::null(), 
        } 
    } 

    pub fn push(&self, t: T) { 
        let mut n = Owned::new(Node { 
            data: ManuallyDrop::new(t), 
            next: Atomic::null(), 
        }); 

        let guard = epoch::pin(); 

        loop { 
            let head = self.head.load(Relaxed, &guard);   // (1) ’Relaxed’ only provides atomicity
            n.next.store(head, Relaxed);  // (2) ‘Relaxed’ only provides atomicity
// (2) uses ‘head’ from (1). so (2) and (1)’s partial order won’t be rotated by CPU and compiler
            // it seems that compare_and_set behaves just like compare_and_swap.
            // It guarantees that all
            // memory operations before the RELEASE operation will appear to happen
            // before the RELEASE operation with respect to the other components of the system.
//  So (1) and (2) always execute before (3)
            match self.head.compare_and_set(head, n, Release, &guard) {   // (3)
                Ok(_) => break, 
                Err(e) => n = e.new, 
            } 
        } 
    } 

    pub fn pop(&self) -> Option<T> { 
        let guard = epoch::pin();  
        loop { 
            // It guarantees that all memory
            // operations after the ACQUIRE operation will appear to happen after the
            // ACQUIRE operation with respect to the other components of the system.
            let head = self.head.load(Acquire, &guard);    // (4)

            match unsafe { head.as_ref() } { 
                Some(h) => { 
                    let next = h.next.load(Relaxed, &guard); // (5) ’Relaxed’ only provides atomicity

                    if self 
                        .head 
                        .compare_and_set(head, next, Release, &guard)   
                        .is_ok() // (6)
                    // This RELEASE matches the ACQUIRE in (4). Code between them won’t be reordered by CPU and compiler
                    { 
                        unsafe { 
                            guard.defer_destroy(head);  
                            return Some(ManuallyDrop::into_inner(ptr::read(&(*h).data))); 
                        } 
                    } 
                } 
                None => return None, 
            } 
        } 
    } 

    // Returns `true` if the stack is empty. 
    pub fn is_empty(&self) -> bool { 
        let guard = epoch::pin(); 
        self.head.load(Acquire, &guard).is_null()   // (7)
    } 
} 

impl<T> Drop for TreiberStack<T> { 
    fn drop(&mut self) { 
        while self.pop().is_some() {} 
    } 
} 

我的问题是:我可以用"放松"替换 (7( 的Acquire吗?似乎 (7( 处的原子性足以使其工作。Acquire通常与Release配对以提供Visibility

后 给定变量上的 ACQUIRE,所有内存访问先于任何先前 保证同一变量上的 RELEASE 是可见的。 在其他 单词,在给定变量的关键部分中,所有访问的所有 该变量的先前关键部分保证具有 完成。

Visibility在此代码中扮演什么角色?看起来代码的排序和原子性是我使这段代码工作所需的唯一保证。如果没有Visibility另一个线程最终将看到store的结果。所以代码仍然可以正常工作。

我主要从 Linux 内核的文档中学习了无锁

我的问题是:我可以用"放松"替换 (7( 的获取吗?似乎 (7( 处的原子性足以使其工作。

假设我们使用Relaxed而不是建议的Acquire。(3( 处的Release不保证is_empty()会看到调用push()的线程的内存访问。对于使用 Treiber 堆栈的使用者线程来说,这可能变得尤为重要,例如

if !my_stack.is_empty() {
let elem = my_stack.pop().unwrap();
// Do things with elem...
} else {
// Do something expensive...
}

[...]另一个线程最终将看到存储的结果。

你是对的。但是,我想尽量减少我做昂贵操作的时间。在上面的代码片段中,我希望is_empty()pop()始终指示我的堆栈处于相同状态。如果方法使用不同的原子排序,我们将失去这种保证,并且我们最终可能会因为is_empty()尚未同步而完成更昂贵的工作。

此外,Treiber的无锁堆栈可用于多消费者,多生产者的上下文。假设一个线程正在调用push(),另一个线程正在调用pop(),第三个线程通过is_empty()观察堆栈的状态。如果没有Acquire内存顺序,第三个线程无法保证观察到推送或弹出线程观察到的相同状态。

相关内容

  • 没有找到相关文章

最新更新