下面是一个简化的C程序,它演示了我在英特尔cpu上使用GNU内置的比较和交换实现的并发堆栈时遇到的问题。我花了一段时间才明白发生了什么,但现在我明白了,我发现它完全在原子比较和交换提供的保证范围内。
当一个节点从堆栈中弹出、修改,然后放回堆栈时,修改后的值可能会成为堆栈的新头,从而损坏它。test_get中的注释描述了导致这种情况的事件顺序。
是否有任何方法可以将同一节点与同一堆栈可靠地多次使用?这是一个夸张的例子,但即使将未修改的节点返回到gHead也可能泄漏其他节点。这种数据结构的最初目的是重复使用相同的节点。
typedef struct test_node {
struct test_node *next;
void *data;
} *test_node_p;
test_node_p gHead = NULL;
unsigned gThreadsDone = 0;
void test_put( test_node_p inMemory ) {
test_node_p scratch;
do {
scratch = gHead;
inMemory->next = scratch;
} while ( !__sync_bool_compare_and_swap( &gHead , scratch , inMemory ) );
}
test_node_p test_get( void ) {
test_node_p result;
test_node_p scratch;
do {
result = gHead;
if ( NULL == result ) break;
// other thread acquires result, modifies next
scratch = result->next; // scratch is 0xDEFACED...
// other thread returns result to gHead
} while ( !__sync_bool_compare_and_swap( &gHead , result , scratch ) );
// this thread corrupts gHead with 0xDEFACED... value
if ( NULL == result ) {
result = (test_node_p)malloc( sizeof(struct test_node) );
}
return result;
}
void *memory_entry( void *in ) {
test_node_p node;
int index , count = 1000;
for ( index = 0 ; index < count ; ++index ) {
node = test_get();
*(uint64_t *)(node) |= 0xDEFACED000000000ULL;
test_put( node );
}
__sync_add_and_fetch(&gThreadsDone,1);
return NULL;
}
void main() {
unsigned index , count = 8;
pthread_t thread;
gThreadsDone = 0;
for ( index = 0 ; index < count ; ++index ) {
pthread_create( &thread , NULL , memory_entry , NULL );
pthread_detach( thread );
}
while ( __sync_add_and_fetch(&gThreadsDone,0) < count ) {}
}
我用8个逻辑核心运行这个测试。当我只使用4个线程时,问题很少发生,但使用8个线程很容易重复。我有一台搭载英特尔酷睿i7的MacBook。
我对使用解决这个问题的库或框架不感兴趣,我想知道它是如何解决的。非常感谢。
编辑:
这里有两个解决方案在我的情况下不起作用。
一些体系结构提供ll/sc指令对,它们对地址而不是值执行原子测试。对地址的任何写入,即使具有相同的值,也会阻止成功。
一些体系结构提供大于指针大小的比较和交换。这样,单调计数器与指针配对,指针每次使用时都会原子递增,因此值总是会更改。一些英特尔芯片支持这一点,但没有GNU包装器。
这是关于这个问题的一个一个的问题。两个线程A和B到达test_get
中的某个点,在该点上,它们对result
具有相同的值,而不是NULL
。然后出现以下顺序:
- 线程A通过比较和交换,并从
test_get
返回result
- 线程A修改
result
的内容 - 线程B取消引用
result
,获取放在那里的线程A - 线程A以
result
结束并调用test_put
- 线程A通过
test_put
中的比较和交换,将结果放回gHead
- 线程B到达
test_get
中的比较和交换并通过 - 线程B现在已使用线程A中的值损坏
gHead
这里有一个类似的场景,有三个线程,不需要线程a来修改result
。
- 线程A通过比较和交换,并从
test_get
返回result
- 线程A不修改
result
的内容 - 线程B取消引用
result
,在scratch
中获取有效值 - 线程C使用不相关的值调用
test_put
并成功 - 线程A调用
test_put
并成功将result
放回gHead
- 线程B到达
test_get
中的比较和交换并通过 - 线程B现在已经通过泄漏线程C添加的任何内容损坏了
gHead
在任何一种情况下,问题都是线程A通过两次比较和交换,一次用于get,然后再次用于put,然后线程B到达比较和交换以获得get。线程B为scratch所具有的任何值都应该被丢弃,但这并不是因为gHead中的值看起来是正确的。
任何在没有实际预防的情况下降低这种可能性的解决方案都会使bug更难追踪。
请注意,scratch变量只是在原子指令开始之前,结果的取消引用值所在位置的名称。删除名称会删除可能被中断的取消引用和比较之间的时间片。
还要注意,原子意味着它作为一个整体成功或失败。指针的任何对齐读取在所讨论的硬件上都是隐含的原子。就其他线程而言,不存在只读取了一半指针的可中断点。
永远不要通过简单的求值访问原子变量。此外,对于像您这样的比较和交换循环,我认为__sync_val_compare_and_swap
更方便。
/* read the head atomically */
result = __sync_val_compare_and_swap(&gHead, 0, 0);
/* compare and swap until we succeed placing the next field */
while ((oldval = result)) {
result = __sync_val_compare_and_swap(&gHead, result, result->next);
if (oldval == result) break;
}
(我放弃之前的答案。)
问题是,您没有原子读取gHead
和gHead->next
的机制,但这是实现无锁堆栈所必需的。由于您打算忙于循环来处理比较和交换冲突,因此可以使用相当于旋转锁的
void lock_get () {
while (!_sync_bool_compare_and_swap(&gGetLock, 0, 1)) {}
}
void unlock_get () {
unlock_get_success = _sync_bool_compare_and_swap(&gGetLock, 1, 0);
assert(unlock_get_success);
}
现在,test_get()
中的环路可以被lock_get()
和unlock_get()
包围。test_get()
的CAS循环只是与test_put()
竞争的一个线程。Jens对CAS循环的实现似乎更清晰。
lock_get();
result = __sync_val_compare_and_swap(&gHead, 0, 0);
while ((oldval = result)) {
result = __sync_val_compare_and_swap(&gHead, result, result->next);
if (oldval == result) break;
}
unlock_get();
这实现了这样的意图,即只有一个线程应该从头部弹出。
如果您有一个CAS变量(在您的例子中是gHead)。您必须始终使用CAS来访问它。或者使用锁来保护它。用于阅读和写作。像"result=gHead;"这样的东西是一个很大的禁忌。
重读你的问题,后进先出法就是一堆。实现任何基于CAS的数据结构都是基于只有一件事需要更改。在堆栈顶部的堆栈中。你似乎在做一个链表。我相信有一些很酷的方法可以创建原子链表。
但对于堆栈,像其他人一样,只执行一个堆栈指针:)