C++中的保序memcpy



我正在开发一个多核、多线程软件库,我想在其中提供更新顺序保护无锁共享内存对象,这些对象可能跨越多个缓存行。

具体地说,假设我有一些缓存行大小的对象的向量X:X[0],…X[K]每个正好占用一个缓存行。我按照索引顺序给它们写:首先是X[0],然后是X[1],等等。如果线程2读取X[K],它是否也会看到X[0]的状态"至少和它看到的X[K'一样当前"?

从同一个线程中,显然我将看到尊重更新顺序的内存语义。但现在,如果第二个线程读取X[K],那么问题就出现了:对X[0]的相应更新。。。是否观察到X[K-1]?

通过锁定,我们确实得到了这一保证。但是,由于memcpy用于将某些内容复制到向量中,我们失去了这个属性:memcpy具有POSIX语义,它根本不能保证索引顺序更新、内存顺序更新或任何其他顺序。您只需保证在memcpy完成后,整个更新都已执行。

我的问题是:是否已经有一个速度相似但具有所需保证的订单保存memcpy?如果没有,这样的原语可以在没有锁定的情况下实现吗?

假设我的目标平台是x86和ARM

(编者按:原来是说英特尔,所以OP可能不关心AMD。)

您描述的订购需求正是发布/获取语义所提供的。(http://preshing.com/20120913/acquire-and-release-semantics/)。

问题是,在所有x86和某些ARM上,有效保证原子加载/存储的原子性单位最多为8字节。否则,其他ARM上只有4个字节(为什么在x86上,自然对齐变量上的整数赋值是原子的?)。在实践中,一些英特尔CPU可能有32甚至64字节(AVX512)的原子存储,但英特尔和AMD都没有做出任何官方保证。

当SIMD矢量存储可能将一个宽对齐的存储分解为多个8字节对齐的块时,我们甚至不知道它们是否有保证的顺序。或者即使这些块是单独的原子。向量加载/存储和聚集/分散的每个元素原子性?有充分的理由相信它们是每个元素的原子,即使文档不能保证。

如果拥有大的"对象"对性能至关重要,你可以考虑在你关心的特定服务器上测试向量加载/存储原子性,但你完全可以自己保证并让编译器使用它,捕捉像SSE指令这样的情况:哪些CPU可以执行原子16B内存操作?由于K10 Opteron上套接字之间的HyperTransport,导致8字节边界撕裂。这可能是一个非常糟糕的主意;在极少数情况下,即使在通常看起来是原子的情况下,如果任何微体系结构条件都能使宽矢量存储成为非原子的,你也猜不到会发生什么。


您可以轻松地对
alignas(64) atomic<uint64_t> arr[1024];这样的数组元素进行发布/获取排序
你只需要很好地询问编译器:

copy_to_atomic(std::atomic<uint64_t> *__restrict dst_a, 
const uint64_t *__restrict src, size_t len) {
const uint64_t *endsrc = src+len;
while (src < src+len) {
dst_a->store( *src, std::memory_order_release );
dst_a++; src++;
}
}

在x86-64上,它不会自动向量化或其他任何操作,因为编译器不会优化原子,也因为没有文档表明使用矢量存储原子元素数组的连续元素是安全的(所以这基本上很糟糕。请在Godbolt编译器资源管理器上查看

我会考虑使用volatile __m256i*指针(对齐的加载/存储)和像atomic_thread_fence(std::memory_order_release)这样的编译器屏障来防止编译时重新排序。每个元素的排序/原子性应该是可以的(但同样不能保证)。当然,不要指望整个32个字节都是原子字节,只需要在较低的uint64_t元素之后写入较高的uint64_t元素(这些存储对其他核心来说是可见的)。


在ARM32上:即使是uint64_t的原子存储也不太好。gcc使用ldrexd/strexd对(LL/SC),因为显然没有8字节的原子纯存储。(我用gcc7.2-O3-march=armv7-a编译。在AArch32模式下,armv8-a的存储对是原子的。当然,AArch64也有原子的8字节加载/存储。)


必须避免使用普通的C库memcpy实现在x86上,它可以使用弱顺序存储来进行大拷贝,从而允许在自己的存储之间重新排序(但不能使用不属于memcpy的后续存储,因为这可能会破坏后续发布的存储。)

movnt缓存绕过矢量循环中的存储,或者具有ERMSB功能的CPU上的rep movsb都可能产生这种效果。"英特尔内存模型"是否使SFENCE和LFENCE成为冗余?。

或者,memcpy实现可以简单地选择在进入其主循环之前先执行最后一个(部分)向量。

在C和C++中,在UB中的非atomic类型上并发写+读或写+写;这就是为什么memcpy可以自由地做任何它想做的事情,包括使用弱有序存储,只要它在必要时使用sfence,以确保memcpy作为一个整体尊重编译器在为以后的mo_release操作发出代码时所期望的顺序。

(也就是说,当前x86的C++实现执行std::atomic时,假设没有弱序存储可供它们担心。任何希望其NT存储尊重编译器生成的atomic<T>代码的顺序的代码都必须使用_mm_sfence()。或者,如果手动编写asm,则直接使用sfence指令。或者,只要使用xchgasm也具有atomic_thread_fence(mo_seq_cst)的作用。)

我发现Peter Cordes对这个问题的回答富有洞察力、详细且非常有用。然而,我没有看到他的建议被写入代码中,所以为了子孙后代和未来需要快速解决DMA或无锁算法需要有序写入的问题的人,我将包括我基于这个答案编写的代码。我在x64和armv7-a上使用gcc 4.9构建了它,尽管我只在x64上运行并测试了它。

#include <atomic>
#include <stdlib.h>
#include <algorithm> // min
extern "C" {
static void * linear_memcpy_portable(void *__restrict dest, const void *__restrict src, size_t n)
{
// Align dest if not already aligned
if ((uintptr_t)dest & sizeof(uint64_t)) {
uint8_t *__restrict dst8 = reinterpret_cast<uint8_t *__restrict>(dest);
const uint8_t *__restrict src8 = reinterpret_cast<const uint8_t *__restrict>(src);
const size_t align_n = std::min(n, (uintptr_t)dest & sizeof(uint64_t));
const uint8_t * const endsrc8 = static_cast<const uint8_t * const>(src) + align_n;
while (src8 < endsrc8) {
*dst8 = *src8;
atomic_thread_fence(std::memory_order_release);
dst8++; src8++;
}
dest = dst8;
src = src8;
n = n - align_n;
}
typedef uint64_t __attribute__((may_alias,aligned(1))) aliasing_unaligned_uint64_t;
uint64_t *__restrict dst64 = static_cast<uint64_t *__restrict>(dest);
const aliasing_unaligned_uint64_t *__restrict src64 = static_cast<const aliasing_unaligned_uint64_t *__restrict>(src);
const uint64_t * const endsrc64 = src64 + n / sizeof(uint64_t);
const uint8_t * const endsrc8 = static_cast<const uint8_t * const>(src) + n;
while (src64 < endsrc64) {
*dst64 = *src64;
atomic_thread_fence(std::memory_order_release);
dst64++; src64++;
}
if (reinterpret_cast<const uint8_t * const>(endsrc64) != endsrc8) {
uint8_t *__restrict dst8 = reinterpret_cast<uint8_t *__restrict>(dst64);
const uint8_t *__restrict src8 = reinterpret_cast<const uint8_t *__restrict>(src64);
while (src8 < endsrc8) {
*dst8 = *src8;
atomic_thread_fence(std::memory_order_release);
dst8++; src8++;
}
}
return dest;
}
#if (_M_AMD64 || __x86_64__)
#include <immintrin.h>
static void * linear_memcpy_avx2(void *dest, const void * src, size_t n) __attribute__((target("avx2")));
static void * linear_memcpy_avx2(void *dest, const void * src, size_t n)
{
__m256i *__restrict dst256 = static_cast<__m256i *__restrict>(dest);
const __m256i *__restrict src256 = static_cast<const __m256i *__restrict>(src);
const __m256i * const endsrc256 = src256 + n / sizeof(__m256i);
const uint8_t * const endsrc8 = static_cast<const uint8_t *>(src) + n;
while (src256 < endsrc256) {
_mm256_storeu_si256(dst256, _mm256_loadu_si256(src256));
atomic_thread_fence(std::memory_order_release);
dst256++; src256++;
}
if (reinterpret_cast<const uint8_t * const>(endsrc256) != endsrc8)
linear_memcpy_portable(dst256, src256, endsrc8 - reinterpret_cast<const uint8_t * const>(endsrc256));
return dest;
}
static void * linear_memcpy_sse2(void *dest, const void * src, size_t n) __attribute__((target("sse2")));
static void * linear_memcpy_sse2(void *dest, const void * src, size_t n)
{
__m128i *__restrict dst128 = static_cast<__m128i *__restrict>(dest);
const __m128i *__restrict src128 = static_cast<const __m128i *__restrict>(src);
const __m128i * const endsrc128 = src128 + n / sizeof(__m128i);
const uint8_t * const endsrc8 = static_cast<const uint8_t *>(src) + n;
while (src128 < endsrc128) {
_mm_storeu_si128(dst128, _mm_loadu_si128(src128));
atomic_thread_fence(std::memory_order_release);
dst128++; src128++;
}
if (reinterpret_cast<const uint8_t * const>(endsrc128) != endsrc8)
linear_memcpy_portable(dst128, src128, endsrc8 - reinterpret_cast<const uint8_t * const>(endsrc128));
return dest;
}
static void *(*resolve_linear_memcpy(void))(void *, const void *, size_t)
{
__builtin_cpu_init();
// All x64 targets support a minimum of SSE2
return __builtin_cpu_supports("avx2") ? linear_memcpy_avx2 : linear_memcpy_sse2;
}
#ifdef __AVX2__
// IF AVX2 is specified to the compiler, alias to the avx2 impl so it can be inlined
void * linear_memcpy(void *, const void *, size_t) __attribute__((alias("linear_memcpy_avx2")));
#else
void * linear_memcpy(void *, const void *, size_t) __attribute__((ifunc("resolve_linear_memcpy")));
#endif
#else
void * linear_memcpy(void *, const void *, size_t) __attribute__((alias("linear_memcpy_portable")));
#endif
} // extern "C"

我欢迎对实施情况的任何反馈。:)

相关内容

  • 没有找到相关文章

最新更新