C 和低级信号量实现



我在考虑如何使用尽可能少的asm代码来实现信号量(而不是二进制)。
如果不使用互斥锁,我还没有成功地思考和编写它,所以到目前为止,我能做的最好的事情是:

全局:

#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
typedef struct
{
atomic_ullong    value;
pthread_mutex_t *lock_op;
bool             ready;
} semaphore_t;
typedef struct
{
atomic_ullong   value;
pthread_mutex_t lock_op;
bool            ready;
} static_semaphore_t;
/* use with static_semaphore_t */
#define SEMAPHORE_INITIALIZER(value) = {value, PTHREAD_MUTEX_INITIALIZER, true}


功能:

bool semaphore_init(semaphore_t *semaphore, unsigned long long init_value)
{   
if(semaphore->ready) if(!(semaphore->lock_op = 
calloc(1, sizeof(pthread_mutex_t)))) return false;
else                 pthread_mutex_destroy(semaphore->lock_op);   
if(pthread_mutex_init(semaphore->lock_op, NULL))
return false;
semaphore->value = init_value;
semaphore->ready = true;
return true;
}
bool semaphore_wait(semaphore_t *semaphore)
{
if(!semaphore->ready) return false;
pthread_mutex_lock(&(semaphore->lock_op));
while(!semaphore->value) __asm__ __volatile__("nop");
(semaphore->value)--;
pthread_mutex_unlock(&(semaphore->lock_op));
return true;
}
bool semaphore_post(semaphore_t *semaphore)
{
if(!semaphore->ready) return false;
atomic_fetch_add(&(semaphore->value), (unsigned long long) 1);
return true;
}


是否可以只使用几行代码,使用原子内置或直接在程序集中(例如lock cmpxchg)来实现信号量?

看看<semaphore.h>包含的来自<bits/sempahore.h>的sem_t结构,在我看来,它被选择了一条非常不同的路径。。。

typedef union
{
char __size[__SIZEOF_SEM_T];
long int __align;
} sem_t;




更新:

@PeterCordes提出了一个更好的解决方案,使用原子,不使用互斥,直接对信号量值进行检查。

我仍然想更好地了解在性能方面改进代码的机会,利用内置的暂停函数或内核调用来避免CPU浪费,等待关键资源可用。

如果有一个互斥和非二进制信号量的标准实现来进行比较,那也会很好。
从futex(7)我读到:"Linux内核提供futex("快速用户空间互斥")作为快速用户空间锁定和信号量的构建块。futex是非常基本的,非常适合构建更高级别的锁定抽象,如互斥、条件变量、读写锁、屏障和信号量。">

请参阅我的最简单的信号量实现,它可能有效。它编译并看起来适合x86。我认为对于任何C11的实现来说,这通常都是正确的。


IIRC,只需一个整数就可以实现计数锁(也称为信号量),您可以通过原子操作访问它。这个维基百科链接甚至给出了up/down的算法。您不需要单独的互斥锁。如果atomic_ullong需要一个互斥来支持目标CPU上的原子增量/减量,那么它将包括一个互斥。(在32位x86上可能是这种情况,或者实现使用慢速cmpxchg8而不是快速lock xadd。对于信号量来说,32位计数器真的太小了吗?因为64位原子在32位机器上会更慢。)

<bits/sempahore.h>并集定义显然只是一个具有正确大小的不透明POD类型,而不是实际实现的指示。


正如@David Schwartz所说,除非你是专家,否则在实际使用中实现自己的锁定是愚蠢的。不过,这可能是一种有趣的方式来了解原子操作并了解标准实现中的隐藏内容。请仔细注意他的警告,锁定实现很难测试。您可以在硬件上使用当前版本编译器的代码以及您选择的编译选项来编写适用于测试用例的代码。。。


ready布尔值完全是浪费空间。如果您能够正确地初始化ready标志,以便函数查看它是有意义的,那么您就可以将其他字段初始化为正常的初始状态。

我注意到你的代码还有一些其他问题:

#define SEMAPHORE_INITIALIZER(value) = {value, PTHREAD_MUTEX_INITIALIZER, true};
static_semaphore_t my_lock = SEMAPHORE_INITIALIZER(1);
// expands to my_lock = = {1, PTHREAD_MUTEX_INITIALIZER, true};;
// you should leave out the = and ; in the macro def so it works like a value

使用动态分配的pthread_mutex_t *lock_op是愚蠢的。使用值,而不是指针。大多数锁定函数都使用互斥锁,因此额外级别的间接操作只会减慢速度。如果记忆和计数器一起放在那里会好得多。互斥锁不需要太多空间。


while(!semaphore->value) __asm__ __volatile__("nop");

我们希望这个循环避免浪费功率和减慢其他线程的速度,甚至避免其他使用超线程共享同一核心的逻辑线程的速度。

nop并没有降低繁忙等待循环的CPU密集度。即使使用超线程,在x86上也可能没有什么不同,因为整个循环体可能仍然适合4个uop,因此在每个时钟的一次迭代中会出现问题,无论其中是否有nopnop不需要一个执行单元,所以至少它没有伤害。这个旋转循环是在持有互斥对象的情况下发生的,这看起来很愚蠢。因此,第一个等待程序将进入这个旋转循环,而之后的等待程序将在互斥对象上旋转。


这是我对信号量的天真实现,只使用C11原子操作

我认为这是一个很好的实现,它实现了非常有限的目标,即正确和小(源代码和机器代码),并且不使用其他实际的锁定原语。我甚至没有尝试解决一些主要领域(例如公平性/饥饿、将CPU分配给其他线程,可能还有其他方面)。

请参阅godbolt上的asm输出:down只有12个x86 insns,up只有2个(包括rets)。Godbolt的非x86编译器(适用于ARM/ARM64/PPC的gcc 4.8)太旧,无法支持C11<stdatomic.h>。(不过他们确实有C++std::atomic)。因此,很遗憾,我无法轻松检查非x86上的asm输出。

#include <stdatomic.h>
#define likely(x)       __builtin_expect(!!(x), 1)
#define unlikely(x)     __builtin_expect(!!(x), 0)
typedef struct {
atomic_int val;   // int is plenty big.  Must be signed, so an extra decrement doesn't make 0 wrap to >= 1
} naive_sem_t;
#if defined(__i386__) || defined(__x86_64__)
#include <immintrin.h>
static inline void spinloop_body(void) { _mm_pause(); }  // PAUSE is "rep nop" in asm output
#else
static inline void spinloop_body(void) { }
#endif
void sem_down(naive_sem_t *sem)
{
while (1) {
while (likely(atomic_load_explicit(&(sem->val), memory_order_acquire ) < 1))
spinloop_body();  // wait for a the semaphore to be available
int tmp = atomic_fetch_add_explicit( &(sem->val), -1, memory_order_acq_rel );  // try to take the lock.  Might only need mo_acquire
if (likely(tmp >= 1))
break;              // we successfully got the lock
else                    // undo our attempt; another thread's decrement happened first
atomic_fetch_add_explicit( &(sem->val), 1, memory_order_release ); // could be "relaxed", but we're still busy-waiting and want other thread to see this ASAP
}
}
// note the release, not seq_cst.  Use a stronger ordering parameter if you want it to be a full barrier.
void sem_up(naive_sem_t *sem) {
atomic_fetch_add_explicit(&(sem->val), 1, memory_order_release);
}

这里的诀窍是val暂时过低是可以的;这只会让其他线程旋转。还要注意,fetch_add是单个原子操作是关键。它返回旧值,因此我们可以在while循环的加载和fetch_add之间检测val何时被另一个线程占用。(注意,我们不需要检查tmp是否等于while循环的负载:如果另一个线程up在负载和fetch_add之间调用信号量,那也没关系。这是使用fetch_ard而不是cmpxchg的好处)。

atomic_load旋转循环只是一个性能优化,而不是让所有的等待程序在val上进行原子读取-修改-写入。(尽管许多服务员试图dec,然后用inc撤销,但让服务员看到锁被解锁可能是非常罕见的)。

一个真正的实现对于更多的平台会有特殊的东西,而不仅仅是x86。对于x86,可能不仅仅是spinloop中的PAUSE指令。这仍然只是一个完全可移植的C11实现的玩具示例。PAUSE显然有助于避免对内存排序的错误猜测,从而使CPU在离开旋转循环后更有效地运行。CCD_ 30与将逻辑CPU交给操作系统以供不同线程运行不同。这也与CCD_ 31参数的正确性和选择无关。

一个真正的实现可能会在经过一定次数的旋转迭代(sched_yield(2),或者更可能是futex系统调用,见下文)后将CPU交给操作系统。也许使用x86MONITOR/MWAIT可以更友好地使用超线程;我不确定。我从来没有真正实现过锁定自己,我只是在查找其他insn时在x86insn引用中看到了所有这些东西。


如前所述,x86的lock xadd指令实现了fetch_add(具有顺序一致性语义,因为locked指令总是一个完整的内存屏障)。在非x86上,仅对fetch_add使用获取+释放语义,而不是完全的顺序一致性,可能会允许更高效的代码。我不确定,但只使用acquire很可能会在ARM64上实现更高效的代码。我认为我们在fetch_add上只需要acquire,而不是acq_rel,但我不确定。在x86上,代码不会有任何差异,因为locked指令是进行原子读取-修改-写入的唯一方法,所以即使是relaxed也将与seq_cst相同(编译时重新排序除外)


如果你想让CPU而不是旋转,你需要一个系统调用(正如人们所说)。显然,为了使Linux上的标准库锁定尽可能高效,已经做了很多工作。当锁被释放时,有专门的系统调用来帮助内核唤醒正确的线程,而且它们使用起来很简单。来自futex(7):

注意
重申一下,纯futexes并不是最终用户易于使用的抽象。(此没有包装函数系统调用油嘴滑舌)实施者应具备组装知识,并已阅读futex用户空间库的来源以下引用。


公平/饥饿(我的天真实现忽略了这一点)

正如维基百科的文章所提到的,某种唤醒队列是一个好主意,所以同一个线程不会每次都得到信号量。(在释放后快速获取锁的代码通常会让释放线程在其他线程仍处于休眠状态时获取锁)。

这是进程中内核协作的另一个主要好处(futex)。

最新更新