"Lock free"一次写入、经常读取的值与线程池结合使用



我面临以下问题:

我的程序使用了";任务";,其中存在代码(函数)的任务,它们共同构成了一个关键区域。每个任务(对象)都知道它的";键";是,但该密钥可能与其他任务共享。";键";(指向另一个资源的指针,下面称为B)在任务的生存期内不会更改。然而,在我创建第二个资源之前,它必须存在。

为了澄清,从任务的角度来看:

  1. 创建资源A,如果该资源还不存在
  2. 一旦我们有了A,就创建资源B(A),如果它还不存在的话

其中B是a的函数:每个a只能创建一个B。存在多个这样的任务,有些任务可能会创建不同的A,或者参见他们的A已经存在,并且使用与其他任务相同的A。

因此,B的创建需要它自己的互斥:所有任务都并行运行,但只应创建一个B(每个A)。

然而,一旦创建了B,我就不想锁定那个互斥对象每次——以防它仍在创建中。创建只发生一次,读取将在程序的其余执行过程中经常发生。

因此,程序流程如下:

class Application {
std::map<A*, B> all_B;
std::mutex all_B_mutex;
B* B_broker(A*);
};
struct A {
std::atomic<B*> cached_B_ptr;
};
  • 每个任务一旦有A,就会调用B_broker(A)-一次。这个函数将锁定互斥并创建B,如果这还不存在,并返回一个指向新B.的指针

  • B_broker(A)获得非NULL指针的线程将该B*存储在成员变量a:A::cached_B_ptr中。

  • 如果B(A)已经存在,则B_broker(A)将返回nullptr。调用线程然后等待,直到它看到A::cached_B_ptr不是null。

  • A::cached_B_ptr的所有访问都使用memory_order_relaxed,因此,在没有锁的情况下只读取正常变量没有区别(因此标题中的"无锁")。在程序的正常操作过程中,此变量只被读取,因此不存在争用。

尽管给定的任务在其自己的关键区域内运行,但不同的任务可能共享相同的a,因此对A::cached_B_ptr的访问不受互斥(或者,无论如何都是相同的互斥)保护。

现在的问题是:这能安全地完成吗?其中,

  • 线程A写入A::cached_B_ptr(松弛)-没有相关的互斥
  • 线程B运行一个与a无关的任务,并希望使用A::cached_B_ptr。B不写入A::cached_B_ptr(B_broker(A)返回nullptr),但由于A::cached_B_ptr不受互斥体保护,也不能依赖于已设置的互斥体;所以B等待直到它读取到一个非空值
  • 线程C继续运行B的任务,也希望使用A::cached_B_ptr,但这次没有任何检查或等待。B所做的是每任务一次的操作,仅在任务启动时完成,并且仅在程序启动后不久(当a写入A::cached_B_ptr时仍可能存在竞争时)完成

PS以下两个问题可能相关:

  • 写一次读多次锁
  • 无锁和无等待线程安全延迟初始化

我使用以下程序使用genmc对此进行了测试:

#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <assert.h>
void __VERIFIER_assume(int);
atomic_int x;
atomic_int order;
atomic_int c;
pthread_mutex_t mutex;
void* thread_A(void* arg)
{
atomic_store_explicit(&x, 12345, memory_order_relaxed);
return NULL;
}
void* thread_B(void* arg)
{
pthread_mutex_lock(&mutex);
// Read x and write it to b.
int read_value = atomic_load_explicit(&x, memory_order_relaxed);
__VERIFIER_assume(read_value == 12345);
// Assume B runs before C.
int o = atomic_load_explicit(&order, memory_order_relaxed);
__VERIFIER_assume(o == 0);
pthread_mutex_unlock(&mutex);
return NULL;
}
void* thread_C(void* arg)
{
pthread_mutex_lock(&mutex);
// Read x and write it to c.
int read_value = atomic_load_explicit(&x, memory_order_relaxed);
atomic_store_explicit(&c, read_value, memory_order_relaxed);
// Keep track of the order in which the threads B and C run.
atomic_store_explicit(&order, 1, memory_order_relaxed);
pthread_mutex_unlock(&mutex);
return NULL;
}
int main()
{
pthread_t tA;
pthread_t tB;
pthread_t tC;
if (pthread_create(&tA, NULL, thread_A, NULL))
abort();
if (pthread_create(&tB, NULL, thread_B, NULL))
abort();
if (pthread_create(&tC, NULL, thread_C, NULL))
abort();
if (pthread_join(tA, NULL))
abort();
if (pthread_join(tB, NULL))
abort();
if (pthread_join(tC, NULL))
abort();
printf("c = %dn",
atomic_load_explicit(&c, memory_order_relaxed));
return 0;
}

注意,genmc仅用于C代码;但是它使用的内存模型与C++的内存模型相同。它在LLVM中间表示级别验证内存模型所基于的虚拟机的可能执行,语法基于pthread应该无关紧要。

测试中的代码段运行一个线程a,该线程向x写入一些值,该值对应于我们的std::atomic<B*> A::cached_B_ptr

void* thread_A(void* arg)
{
atomic_store_explicit(&x, 12345, memory_order_relaxed);
return NULL;
}

这是一个没有任何锁定的轻松写入。

然后,一个独立的线程B读取x,并循环直到它读取一个非NULL值。或者,相反,我们读一次,然后告诉genmc,我们假设它读A写的值:

int read_value = atomic_load_explicit(&x, memory_order_relaxed);
__VERIFIER_assume(read_value == 12345);

线程C扮演着运行B在完成初始化后启动的任务的角色(调用B_broker(A),看到它返回nullptr,然后读取A::cached_B_ptr,直到返回非NULL值)。因此,我们只对C在B之后运行的代码段的执行感兴趣。这是通过在线程C中将原子order设置为1来实现的,并在线程B:中验证/假设它仍然为0

// Assume B runs before C.
int o = atomic_load_explicit(&order, memory_order_relaxed);
__VERIFIER_assume(o == 0);

最后,在线程C中,我们读取x,以验证它始终与A所写的内容同步,即使与线程A完全没有同步,并且对x的所有访问都是放松的。这是通过读取C中的x并将其写入全局变量(c)以供稍后检查来完成的。之后CCD_ 30被设置为1。

// Read x and write it to c.
int read_value = atomic_load_explicit(&x, memory_order_relaxed);
atomic_store_explicit(&c, read_value, memory_order_relaxed);
// Keep track of the order in which the threads B and C run.
atomic_store_explicit(&order, 1, memory_order_relaxed);

程序的输出为:

c = 12345
No errors were detected.
Number of complete executions explored: 1
Number of blocked executions seen: 2
Total wall-clock time: 0.01s

换句话说,在B在C和B看到A写入的值之前运行的设置限制内,C总是看到相同的值。如果不是这样的话,c = 0也会被打印出来,并且完整执行的数量会大于1。