在单写多读线程中交换缓冲区



故事

有一个writer线程,定期从某处收集数据(实时的,但这在问题中并不重要)。有很多读者从这些数据中阅读。通常的解决方案是使用两个读写锁和两个缓冲区,如下所示:

Writer (case 1):
acquire lock 0                        
loop
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period

Writer (case 2):
acquire lock 0                        
loop
    acquire other lock
    free this lock
    swap buffers
    write to current buffer
    wait for next period

问题

在这两种方法中,如果获取其他锁操作失败,则不会进行交换,并且写入器将覆盖其先前的数据(因为写入器是实时的,它不能等待读取器),因此在这种情况下,所有读取器都会丢失该数据帧。

这不是什么大问题,阅读器是我自己的代码,它们很短,所以有了双缓冲区,这个问题就解决了,如果有问题,我可以让它三重缓冲区(或更多)。

问题是我想要最小化的延迟。想象情况1:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up, and again writes to buffer0

在**此时**,理论上其他的读取器可以读取buffer0的数据,只要写入器可以在读取器完成后进行交换,而不是等待下一个周期。在这种情况下发生的是,仅仅因为一个读取器晚了一点,所有读取器都错过了一帧数据,而这个问题是完全可以避免的。

情况2类似:

writer writes to buffer0                reader is idle
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)
|
|                                       reader starts reading buffer1
writer wakes up                         |
it can't acquire lock1                  because reader is still reading buffer1
overwrites buffer0

我尝试混合解决方案,所以作者尝试在写完后立即交换缓冲区,如果不可能,就在下一个时期醒来后。像这样:

Writer (case 3):
acquire lock 0                        
loop
    if last buffer swap failed
        acquire other lock
        free this lock
        swap buffers
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period

现在延迟的问题仍然存在:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up
swaps buffers
writes to buffer1

再次在**这个点**,所有的读取器都可以开始读取buffer0,这是buffer0被写入后的短暂延迟,但它们必须等到写入器的下一个周期。

的问题

问题是,我如何处理这个?如果我想让写入器精确地在期望的周期执行,它需要使用RTAI函数等待周期,我不能像

那样做
Writer (case 4):
acquire lock 0                        
loop
    write to current buffer
    loop a few times or until the buffer has been swapped
        sleep a little
        acquire other lock
        free this lock
        swap buffers
    wait for next period

这引入了抖动。因为"几次"可能会比"等待下一个周期"还要长因此,作者可能会错过句号的开始。

为了更清楚,这是我想要发生的:

writer writes to buffer0                reader is reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      As soon as all readers finish reading,
|                                         the buffer is swapped
|                                       readers start reading buffer0
writer wakes up                         |
writes to buffer1

我已经发现了什么

我发现read-copy-update,据我所知,它一直为缓冲区分配内存并释放它们,直到读取器完成它们,这对我来说是不可能的,原因有很多。第一,线程在内核空间和用户空间之间共享。其次,使用RTAI,您不能在实时线程中分配内存(因为这样您的线程将调用Linux的系统调用,从而破坏实时活动!)(更不用说使用Linux自己的RCU实现是无用的,因为同样的原因)

我还考虑过有一个额外的线程,以更高的频率尝试交换缓冲区,但这听起来不像一个好主意。首先,它本身需要与写入器同步,其次,我有许多这样的写入器-读取器并行工作在不同的部分,每个写入器一个额外的线程似乎太多了。所有写入器的一个线程在与每个写入器同步方面似乎非常复杂。

读写锁使用的是什么API ?您是否有一个定时锁,如pthread_rwlock_timedwrlock?如果是,我认为这是一个解决你的问题,就像下面的代码:

void *buf[2];
void
writer ()
{
  int lock = 0, next = 1;
  write_lock (lock);
  while (1)
    {
      abs_time tm = now() + period;
      fill (buf [lock]);
      if (timed_write_lock (next, tm))
        {
          unlock (lock);
          lock = next;
          next = (next + 1) & 1;
        }
      wait_period (tm);
    }
}

void
reader ()
{
  int lock = 0;
  while (1)
    {
      reade_lock (lock);
      process (buf [lock]);
      unlock (lock);
      lock = (lock + 1) & 1;
    }
}

这里发生的事情是,对于写入器来说,它是等待锁还是等待下一个周期并不重要,只要它确定在下一个周期到来之前醒来即可。绝对超时确保了这一点。

这不正是三重缓冲应该解决的问题吗?我们有3个缓冲区,分别叫它们write1 write2和read。写线程在写write1和write2之间交替进行,确保它们永远不会阻塞,并且最后一个完整的帧总是可用的。然后在读线程中,在某个适当的点(比如,在读取帧之前或之后),读缓冲区与可用的写缓冲区互换。

虽然这将确保写入器永远不会阻塞(只需翻转两个指针就可以非常快速地完成缓冲区翻转,甚至可能使用CAS原子而不是锁),但仍然存在一个问题,即读取器必须等待其他读取器在翻转之前完成读缓冲区的操作。我想这可以用rcu式的读缓冲区池来解决,其中可用的一个可以被翻转。

  • 使用队列(FIFO链表)
  • 实时写入器总是将(enqueue)附加到队列的末尾
  • 读取器总是从队列的开头移除(dequeue)
  • 如果队列为空,读取器将阻塞

编辑以避免动态分配

我可能会使用循环队列…我会使用内置的__sync原子操作。http://gcc.gnu.org/onlinedocs/gcc-4.1.0/gcc/Atomic-Builtins.html Atomic-Builtins

  • 循环队列(FIFO 2d数组)
    • ex: byte[][] Array = new byte[MAX_SIZE][BUFFER_SIZE];
    • 起始和结束索引指针
  • 写入器覆盖数组缓冲区[End][]
    • Writer可以在
    • 周围循环时增加Start
  • Reader从Array[Start][]获取buffer
    • 如果Start == End
  • 如果您不想让写程序等待,那么它可能不应该获取其他人可能持有的锁。我会让它执行某种同步,虽然,以确保它写的东西真的被写出来-通常,大多数同步调用将导致内存刷新或屏障指令被执行,但细节将取决于cpu的内存模型和线程包的实现。

    我会看看是否有其他更适合的同步原语,但如果迫近,我会让写器锁定并解锁一个没有人使用的锁。

    读者必须准备好时不时地错过一些东西,并且必须能够发现他们错过了一些东西。我会将有效性标志和长序列计数与每个缓冲区关联起来,并让编写器执行类似"清除有效性标志、增加序列计数、同步、写入缓冲区、增加序列计数、设置有效性标志、同步"的操作。如果读取器读取序列计数,同步,看到有效性标志为true,读取数据,同步,并重新读取相同的序列计数,那么也许有希望它没有得到乱码数据。

    如果你要这样做,我会详尽地测试它。这对我来说似乎是合理的,但它可能不适用于从编译器到内存模型的所有特定实现。

    另一种方法是在缓冲区中添加校验和,并在最后写入校验和。

    参见对无锁算法的搜索,如http://www.rossbencina.com/code/lockfree

    与此同时,您可能需要一种方法让作者向睡眠的读者发出信号。你可以使用Posix信号量来实现这一点——例如,当一个特定的信号量达到给定的序列号时,或者当一个缓冲区变得有效时,读取器要求写入器调用sem_post()。

    另一个选择是坚持使用锁,但要确保读取器不会长时间持有锁。通过在读取锁期间不做任何其他事情,只从写入器的缓冲区复制数据,可以使读取锁所需的时间保持在较短且可预测的范围内。唯一的问题是,低优先级的读取器可能会在写入过程中被高优先级的任务中断,解决方法是http://en.wikipedia.org/wiki/Priority_ceiling_protocol。

    鉴于此,如果写线程具有高优先级,则每个缓冲区所做的最坏情况是写线程填充缓冲区,而每个读线程将数据从该缓冲区复制到另一个缓冲区。如果在每个周期中都能负担得起,那么写入线程和一定数量的读取器数据复制将始终完成,而读取器处理其复制的数据可能会或可能不会完成其工作。如果它们不这样做,它们就会落后,并且在下次抓取锁并查看要复制哪个缓冲区时才会注意到这一点。

    顺便说一下,我阅读实时代码的经验(当需要显示bug存在时,而不是在我们的代码中)是令人难以置信的和故意的简单思想,非常清楚地布局,并不一定比满足其最后期限所需的效率更高,所以一些明显毫无意义的数据复制为了直接锁定工作可能是一个很好的交易,如果你能负担得起的话。