假设我有以下接受票证的自旋锁互斥实现(在C中使用GCC原子内置)。据我所知;释放";解锁功能中的记忆顺序正确。不过,我不确定锁的功能。因为这是一个取票互斥体,所以有一个字段指示要分发的下一个票号,还有一个字段表示当前持有锁的票号。我在票证增量上使用了获取释放,在旋转加载上使用了获得。这是不必要的强烈吗?如果是,为什么?
另外,这两个字段(票证和服务)是否应该间隔开,以便它们位于不同的缓存线上,或者这无关紧要?我主要对arm64和amd64感兴趣。
typedef struct {
u64 ticket;
u64 serving;
} ticket_mutex;
void
ticket_mutex_lock(ticket_mutex *m)
{
u64 my_ticket = __atomic_fetch_add(&m->ticket, 1, __ATOMIC_ACQ_REL);
while (my_ticket != __atomic_load_n(&m->serving, __ATOMIC_ACQUIRE));
}
void
ticket_mutex_unlock(ticket_mutex *m)
{
(void) __atomic_fetch_add(&m->serving, 1, __ATOMIC_RELEASE);
}
更新:根据已接受答案中的建议,我已将实现调整为以下内容。此互斥对象适用于低争用情况。
typedef struct {
u32 ticket;
u32 serving;
} ticket_mutex;
void
ticket_mutex_lock(ticket_mutex *m)
{
u32 my_ticket = __atomic_fetch_add(&m->ticket, 1, __ATOMIC_RELAXED);
while (my_ticket != __atomic_load_n(&m->serving, __ATOMIC_ACQUIRE)) {
#ifdef __x86_64__
__asm __volatile ("pause");
#endif
}
}
void
ticket_mutex_unlock(ticket_mutex *m)
{
u32 my_ticket = __atomic_load_n(&m->serving, __ATOMIC_RELAXED);
(void) __atomic_store_n(&m->serving, my_ticket+1, __ATOMIC_RELEASE);
}
m->ticket
增量只需要是RELAXED
。您只需要每个线程就可以获得不同的票号;它可以根据您的意愿提前或延迟执行wrt。同一线程中的其他操作。
load(&m->serving, acquire)
是命令关键部分的操作,在我们与锁的前一个持有者的unlock
函数中的RELEASE
操作同步之前,阻止这些操作启动。因此m->serving
负载需要至少为acquire
。
即使m->ticket++
直到m->serving
的获取负载之后才完成,也没关系。while
条件仍然确定执行是否(非推测性地)进入关键部分。对关键部分进行推测性执行是很好的,因为这可能意味着它可以更快地准备好提交,从而减少持有锁的时间。
RMW操作的额外排序不会使其在本地或线程间可见性方面更快,并且会减慢线程获取锁的速度。
一条或两条缓存线
就性能而言,我认为具有高争用性的,将成员保留在单独的缓存行中是有好处的
需要独占缓存行以获得票证编号的线程不会与线程解锁.serving
发生冲突,因此这些线程间延迟可以并行发生。
在旋转等待while(load(serving))
循环中有多个内核的情况下,它们可以命中本地L1d缓存,直到某个东西使线路的共享副本无效,而不会产生任何额外的流量。但是,除非使用x86_mm_pause()
之类的东西,否则会浪费大量的功率,还会浪费可以与同一物理上的另一个逻辑核心共享的执行资源。x86pause
在离开自旋循环时也避免了分支预测错误。相关:
- ";暂停";x86中的指令
- x86暂停指令在spinlock*中是如何工作的?它可以在其他场景中使用吗
- 通过内联程序集锁定内存操作
通常建议在两次检查之间进行指数后退,最多停顿一些时间,但在这里我们可以做得更好:在两次检测之间使用许多pause
指令,这些指令与my_ticket - m->serving
成比例,因此您可以在收到罚单时更频繁地进行检查
在争用率非常高的情况下,如果您要等待很长时间,那么回退到操作系统辅助的睡眠/唤醒是合适的,比如Linuxfutex
。或者,如果您的等待间隔超过3或8个票号或其他什么,我们可以看到我们离队伍的最前面有多近,yield
、nanosleep
或futex
。(可根据出票时间进行调整。)
(使用futex
,您可以在解锁中引入m->ticket
的读取,以确定是否有线程正在休眠,等待通知。就像C++20atomic<>.wait()
和atomic.notify_all()
一样。不幸的是,我不知道一个好的方法来确定要通知哪个线程,而不是唤醒他们来检查他们是否是幸运的赢家。
对于低平均争用,您应该将两者保持在同一缓存行中。访问CCD_ 25之后总是紧接着加载CCD_。在未锁定的无争用情况下,这意味着只有一条缓存线在跳动,或者必须保持热状态才能让同一内核获取/释放锁。
如果锁已经被持有,那么想要解锁的线程需要对RMW或存储的缓存行拥有独占所有权。无论另一个核心在包含.serving
的线路上执行RMW还是仅执行纯负载,它都会失去这一点。
不会有太多的情况下,多个服务员都在同一个锁上旋转,新线程得到一个票号会延迟解锁,以及它对等待它的线程的可见性。
无论如何,这是我的直觉;可能很难进行微基准测试,除非缓存未命中原子RMW停止了稍后的加载,甚至无法开始请求稍后的行,在这种情况下,在获取锁时可能会有两个缓存未命中延迟。
在解锁时避免原子RMW
持有锁的线程知道它具有独占所有权,没有其他线程会同时修改m->serving
。如果你让锁的主人记住自己的票号,你可以将解锁优化为只有一家商店。
void ticket_mutex_unlock(ticket_mutex *m, uint32_t ticket_num)
{
(void) __atomic_store_n(&m->serving, ticket_num+1, __ATOMIC_RELEASE);
}
或者没有API更改(从u32 ticket_mutex_lock()
返回一个整数)
void ticket_mutex_unlock(ticket_mutex *m)
{
uint32_t ticket = __atomic_load_n(&m->serving, __ATOMIC_RELAXED); // we already own the lock
// and no other thread can be writing concurrently, so a non-atomic increment is safe
(void) __atomic_store_n(&m->serving, ticket+1, __ATOMIC_RELEASE);
}
这在需要原子RMW的LL/SC重试循环的ISAs上具有很好的效率优势,在这种情况下,可能会发生来自另一个核心读取值的虚假故障。在x86上,唯一可能的原子RMW是一个完整的屏障,甚至比Cseq_cst
语义所需的更强。
BTW,锁定字段将可以作为uint32_t
。您不会有2^32个线程在等待锁定。所以我用了uint32_t
而不是u64
。环绕是定义明确的。即使是像ticket - serving
这样的减法也能起作用,即使跨越了包装边界,就像1 - 0xffffffffUL
一样,也能给出2,所以你仍然可以计算出你离被服务的距离有多近,以便做出睡眠决定。
x86-64没什么大不了的,只节省了一点代码大小,在AArch64上可能根本不是一个因素。但对某些32位ISAs会有很大帮助。