我有两个大小为N
的缓冲区。我想在不使用锁的情况下从不同的线程写入缓冲区
我维护一个缓冲区索引(0和1)和一个偏移量,在这个偏移量中,对缓冲区的新写入操作开始。如果我能得到当前的偏移量,并以原子的方式将偏移量设置为offset + len_of_the_msg
,这将保证不同的线程不会相互覆盖。我还必须处理缓冲区溢出的问题。一旦缓冲区已满,请切换缓冲区并将偏移量设置为0。
按顺序执行的任务:
-
设置
a = offset
-
用
msg_len
增加offset
-
if offset > N:
切换缓冲区,将a
设置为0,将offset
设置为msg_len
我正在C中实现这一点。编译器是gcc。
如何在不使用锁的情况下以原子方式执行此操作?有可能这样做吗?
编辑:
我不需要使用2个缓冲区。我想做的是";将来自不同线程的日志消息收集到缓冲区中,并且一旦达到某个缓冲区使用阈值就将该缓冲区发送到服务器;
re:您的编辑:
我不必使用2个缓冲区。我想做的是:将来自不同线程的日志消息收集到缓冲区中,并在达到某个缓冲区使用阈值时将缓冲区发送到服务器
一个无锁的循环缓冲区可能会工作,读卡器会收集直到最后一个写入条目的所有数据。基于使用数组作为循环缓冲区来扩展现有的MPSC或MPMC队列可能是可能的;请参阅下面的提示。
不过,验证所有条目是否已完全写入仍然是一个问题,可变宽度条目也是一个问题。在长度+序列号的频带中这样做意味着你不能只将字节范围发送到服务器,并且读取器必须遍历";链表";(长度为"指针")来检查序列号,当它们不可避免地缓存未命中时,这是缓慢的
也许可以使用固定宽度的起始/结束位置对的辅助阵列来跟踪";完成";按序列号显示状态(写入程序在写入消息数据后,用释放存储存储序列号。看到正确序列号的读者知道,这次数据是通过循环缓冲区写入的,而不是上次。序列号提供ABA保护,而不是"完成"标志,读写器在读取时必须取消设置er)
我只是在头脑风暴ATM,我可能会回到这个话题,写下更多的细节或代码,但我可能不会。如果其他人想在这个想法的基础上写下答案,请放心。
进行某种非锁定自由同步可能更有效,以确保所有写入程序都通过了某个点。或者,如果每个写入程序都存储了它所声明的位置,则读取器可以扫描该阵列(如果只有几个写入程序线程),并找到最低的未完全写入的位置。
我想象的是,在检测到其增量已将队列的已用空间推过某个阈值后,编写器应该唤醒读取器(甚至自己执行任务)使阈值比您通常想要实际发送的阈值稍高,以说明以前的作者实际上不允许您阅读的部分内容。
如果设置为切换缓冲区:
我认为在切换缓冲区时可能需要某种锁定。(或者至少是更强的同步,以确保缓冲区中所有声称的空间实际上都已写入。)
但是在一个缓冲区内,我认为无锁定是可能的。这是有很大帮助还是有一点帮助取决于你如何使用它。来回跳动缓存行总是很昂贵的,无论这只是索引,还是锁加上一些写索引。如果两条消息不是全部64字节对齐(与缓存线边界对齐),则在两条消息之间的边界处也会出现错误共享
最大的问题是,在原子更新offset
时,缓冲区编号可能会发生变化。
每个缓冲区可能有一个单独的偏移量,并且在更改缓冲区时有一些额外的同步。
或者,您可以将缓冲区编号和偏移量打包到一个64位结构中,您可以尝试使用atomic_compare_exchange_weak
来访问CAS。这可以让编写器线程声明已知缓冲区中的空间量。您确实需要CAS,而不是fetch_add
,因为您无法为fetch_add设置上限;它将与任何单独的支票竞争。
因此,您读取当前偏移量,检查是否有足够的空间,然后尝试使用offset+msg_len
访问CAS。一旦成功,你就占据了缓冲区的那一部分。失败时,其他线程先得到它这与多生产者队列使用循环缓冲区所做的基本相同,但我们将其推广为保留一个字节范围,而不是仅使用CAS(&write_idx, old, old+1)
保留一个条目。
(如果你得到的最后一个偏移+len超过了缓冲区的末尾,也许可以使用fetch_add
并中止。如果你可以避免执行任何fetch_sub
来撤消它,这可能是好的,但如果你有多个线程试图通过更多的修改来撤消它们的错误,那就更糟了缓冲区的d,给定一些排序。CAS避免了这种情况,因为只有实际可用的偏移量才能交换。)
但您还需要一种机制来知道写入程序何时完成了对缓冲区声明区域的存储。因此,出于这个原因,可能需要围绕缓冲区更改进行额外的同步,以确保在我们让读者触摸它之前,所有挂起的写入都已经发生
使用循环缓冲区(例如,无锁进度保证)的MPMC队列通过只有一个缓冲区来避免这种情况,并在写入程序申请插槽并存储到其中后,为写入程序提供一个位置,将每次写入标记为使用释放存储完成。具有固定大小的插槽使这变得更容易;可变长度的消息会使其变得不平凡,或者根本不可行。
";声明一个字节范围";不过,我提出的机制非常像基于无锁数组的队列。写入程序尝试CAS写入索引,然后使用声明的空间。
显然,所有这些都将使用C11#include <stdatomic.h>
for_Atomic size_t offsets[2]
或GNU C内置__atomic_...
来完成
我认为这是无法以无锁的方式解决的,除非您只排除操作系统级锁定原语,并且可以在应用程序代码中使用短暂的旋转锁(这是个坏主意)。
为了进行讨论,让我们假设您的缓冲区是这样组织的:
#define MAXBUF 100
struct mybuffer {
char data[MAXBUF];
int index;
};
struct mybuffer Buffers[2];
int currentBuffer = 0; // switches between 0 and 1
虽然部分可以用原子级原语完成,但在这种情况下,整个操作必须以原子方式完成,因此这实际上是一个重要的关键部分。我无法想象任何一个编译器有一个独角兽原语来实现这一点。
查看GCC__atomic_add_fetch()
原语,这会将给定值(消息大小)添加到变量(当前缓冲区索引),返回新值;通过这种方式,您可以测试溢出。
查看一些不正确的粗略代码;
// THIS IS ALL WRONG!
int oldIndex = Buffers[current]->index;
if (__atomic_add_fetch(&Buffers[current]->index, mysize, _ATOMIC_XX) > MAXBUF)
{
// overflow, must switch buffers
// do same thing with new buffer
// recompute oldIndex
}
// copy your message into Buffers[current] at oldIndex
这在各个方面都是错误的,因为几乎在每一点上,其他线程都可能潜入并从你身上改变事情,造成严重破坏。
如果您的代码获取了恰好来自缓冲区0的oldIndex
,但在if
测试运行之前,其他线程偷偷进入并更改了当前缓冲区,该怎么办?
__atomic_add_fetch()
会在新的缓冲区中分配数据,但您会将数据复制到旧的缓冲区。
这就是NASCAR的比赛条件,我看不出如果不将整个过程视为关键部分,让其他过程等待轮到他们,你怎么能做到这一点。
void addDataTobuffer(const char *msg, size_t n)
{
assert(n <= MAXBUF); // avoid danger
// ENTER CRITICAL SECTION
struct mybuffer *buf = Buffers[currentBuffer];
// is there room in this buffer for the entire message?
// if not, switch to the other buffer.
//
// QUESTION: do messages have to fit entirely into a buffer
// (as this code assumes), or can they be split across buffers?
if ((buf->index + n) > MAXBUF)
{
// QUESTION: there is unused data at the end of this buffer,
// do we have to fill it with NUL bytes or something?
currentBuffer = (currentBuffer + 1) % 2; // switch buffers
buf = Buffers[currentBuffer];
}
int myindex = buf->index;
buf->index += n;
// copy your data into the buffer at myindex;
// LEAVE CRITICAL SECTION
}
我们对这些数据的使用者一无所知,因此我们无法判断它是如何收到新消息的,也无法判断您是否可以将数据副本移动到关键部分之外。
但是关键部分中的所有内容都必须以原子方式完成,而且由于您无论如何都在使用线程,因此您还可以使用线程支持附带的原语。可能是Mutex。
这样做的一个好处是,除了避免竞争条件外,关键部分中的代码不必使用任何原子基元,可以是普通的(但小心的)C代码。
另外一点需要注意的是:可以用一些互锁的交换恶作剧来滚动您自己的关键部分代码,但这是一个可怕的想法,因为它很容易出错,使代码更难理解,并避免了为这个目的设计的久经考验的线程基元。