我正在用C实现一个无锁链表,类似于linux内核llist.h中可用的链表。我正在使用"__sync_bool_compare_and_swap"的原子操作。
这是代码狙击:
struct llist_head {
struct llist_node *head;
struct llist_node *tail;
};
struct llist_node {
struct llist_node *next;
int mm_ref;
};
#define LLIST_HEAD_INIT(name) { NULL, NULL }
#define LLIST_HEAD(name) struct llist_head name = LLIST_HEAD_INIT(name)
void llist_insert_tail(struct llist_head *head, struct llist_node *new)
{
volatile struct llist_node *last;
new->mm_ref = 0;
mb();
new->next = NULL;
do {
last = head->tail;
if (last)
last->next = new;
} while(!CAS(&(head->tail), last, new));
CAS(&(head->head), NULL, new);
mb();
}
struct llist_node *llist_remove_head(struct llist_head *head)
{
volatile struct llist_node *first, *next;
do {
first = head->head;
if (first != head->head)
continue;
if (first == NULL)
return 0;
next = first->next;
printf("%s: tid=%lx first=%p next=%p first->next=%pn",
__func__, pthread_self(), first, next, first->next);
} while(!CAS(&(head->head), first, next));
return first;
}
我有一个小型的多线程测试程序,只有 1 个生产者线程将节点插入尾部列表,两个消费者线程从列表头中删除节点。插入/移除功能如下所示:
void *list_insert(void *data)
{
int i;
printf("producer: id=%lxn", pthread_self());
for ( i = 0 ; i < 10 ; i++) {
struct my_list_node *e = malloc(sizeof(struct my_list_node));
e->a = SOMEID;
llist_insert_tail(&global_list, &(e_array[i]->list));
printf("new node p=%p next=%pn", e_array[i], e_array[i]->list.next);
ATOMIC_ADD64(&cn_added, 1);
}
ATOMIC_SUB(&cn_producer, 1);
printf("Producer thread [%lx] exited! Still %d running...n",pthread_self(), cn_producer);
return 0;
}
void *list_remove(void *data)
{
struct llist_node *pos = NULL;
printf("consumer: id=%lxn", pthread_self());
while(cn_producer || !llist_empty(&global_list)) {
struct my_list_node *e;
pos = llist_remove_head(&global_list);
if (pos) {
e = llist_entry(pos, struct my_list_node, list);
printf("tid=%lx removed %pn", pthread_self(), pos);
if (e->a != SOMEID) {
printf("data wrong!!n");
exit(1);
}
ATOMIC_ADD64(&cn_deleted, 1);
} else {
sched_yield();
}
}
printf("Consumer thread [%lx] exited %dn", pthread_self(), cn_producer);
return 0;
}
测试显示一致的失败,例如生产者插入了 10 个节点,但消费者只弹出了 1/2 或 3 个节点,我得到的典型输出之一如下所示:
consumer: id=7f4d469e8700
consumer: id=7f4d461e7700
producer: id=7f4d459e6700
new node p=0x7f4d400008c0 next=(nil)
new node p=0x7f4d400008e0 next=(nil)
new node p=0x7f4d40000900 next=(nil)
new node p=0x7f4d40000920 next=(nil)
new node p=0x7f4d40000940 next=(nil)
new node p=0x7f4d40000960 next=(nil)
new node p=0x7f4d40000980 next=(nil)
new node p=0x7f4d400009a0 next=(nil)
new node p=0x7f4d400009c0 next=(nil)
new node p=0x7f4d400009e0 next=(nil)
Producer thread [7f4d459e6700] exited! Still 0 running...
llist_remove_head: tid=7f4d469e8700 first=0x7f4d400008c0 next=(nil) first->next=(nil)
llist_remove_head: tid=7f4d469e8700 head=(nil)
tid=7f4d469e8700 removed 0x7f4d400008c0
Consumer thread [7f4d469e8700] exited 0
llist_remove_head: tid=7f4d461e7700 first=0x7f4d400008c0 next=(nil) first->next=(nil)
Consumer thread [7f4d461e7700] exited 0
可以看出,生产者线程首先退出,并切换到消费线程,但是,这一行:llist_remove_head: tid=7f4d469e8700 first=0x7f4d400008c0 next=(nil) first->next=(nil)
显示其中一个使用者线程正在尝试删除第一个节点,但它的下一个指针指向 NULL,这种情况不应该是这样,因为到目前为止,列表已完全填充(有 10 个节点(。 所以有竞争条件,这应该会发生,因为这是无锁列表,但我挠了挠头,无法弄清楚什么样的竞争条件会导致这种输出。 此实现类似于 https://github.com/darkautism/lfqueue 但是我不知道为什么我的版本不起作用。
你可以试试 lfqueue
它使用简单,圆形设计无锁
int *ret;
lfqueue_t results;
lfqueue_init(&results);
/** Wrap This scope in multithread testing **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
while (lfqueue_enq(&results, int_data) != 1) ;
/*Dequeue*/
while ( (ret = lfqueue_deq(&results)) == NULL);
// printf("%dn", *(int*) ret );
free(ret);
/** End **/
lfqueue_clear(&results);
有一个 lfstack 与同一个开发人员一起出现。