双缓冲区系统并发处理的设计



我有一个长期运行的应用程序,它基本上是:

  1. 从网络上读取数据包
  2. 把它保存在某个地方
  3. 处理并输出到磁盘

这确实是一个非常常见的用例——除了数据大小和数据速率都可能非常大之外。为了避免内存溢出并提高效率,我考虑了一种双缓冲区设计,其中缓冲区a和B交替:当a保存网络数据包时,B被处理以输出。一旦缓冲区A达到软边界,A将用于输出处理,而B将用于保存网络数据包。

我对并发/多线程程序范例没有特别的经验。我读过一些关于处理多生产者和多消费者情况的循环缓冲区的讨论。我不确定这是否是最好的解决方案,而且双缓冲区的设计似乎更简单。

我的问题是:有没有一种设计模式可以让我来解决这个问题?还是更好的设计?如果可能的话,请使用伪代码来帮助说明解决方案。谢谢

我建议您不应该假设"两个"(或任何固定的数量的…)缓冲区,而应该简单地使用队列,从而使用"生产者/消费者"关系。

接收数据包的进程只需将数据包添加到某个特定大小的缓冲区中,然后,当缓冲区足够满经过指定的(短…)时间间隔时,将(非空)缓冲区放入队列中,由另一方处理。然后它分配一个新的缓冲区供自己使用。

接收("其他…")进程在任何时候都会被唤醒,队列中可能有新的缓冲区供其处理。它移除缓冲区,对其进行处理,然后再次检查队列。只有当它发现队列为空时,它才会进入睡眠状态(注意确保进程不能在另一个进程决定发出信号的确切时刻决定进入睡眠……这里不能有"竞争条件"。)

考虑简单地为"每条消息"分配存储(无论"消息"对您来说意味着什么),并将该"消息"放入队列,这样就不会因"等待缓冲区填满"而导致不必要的处理延迟

值得一提的是,在这种情况下,可以使用实时音频处理/录制中使用的一种技术,该技术使用足够大小的单个环形缓冲区(如果您喜欢这个术语,也可以使用fifo)。

然后您将需要一个读写光标。(你是否真的需要锁,或者是否可以使用易失性加内存屏障,这是一个敏感的话题,但如果性能很重要,portaudio的人建议你在没有锁的情况下这样做。)

您可以使用一个线程进行读取,也可以使用另一个线程来进行写入。读取线程应该消耗尽可能多的缓冲区。除非缓冲区空间用完,否则您将是安全的,但双缓冲区解决方案也存在这种情况。因此,基本的假设是,您可以比输入更快地写入磁盘,或者您需要扩展解决方案。

找到一个有效的生产者-消费者队列类。使用一个来创建缓冲池,以提高性能并控制内存使用。使用另一个将缓冲区从网络线程传输到磁盘线程:

#define CnumBuffs 128
#define CbufSize  8182
#define CcacheLineSize 128
    public class netBuf{
      private char cacheLineFiller[CcacheLineSize];  // anti false-sharing space
      public int dataLen;
      public char bigBuf[CbufSize];
    };
    PCqueue pool;
    PCqueue diskQueue;
    netThread Thread;
    diskThread Thread;
    pool=new(PCqueue);
    diskQueue=new(PCqueue);
    // make an object pool
    for(i=0;i<CnumBuffs,i++){
      pool->push(new(netBuf));
    };
    netThread=new(netThread);
    diskThread=new(diskThread);
    netThread->start();
    diskThread->start();
    ..
    void* netThread.run{
      netbuf *thisBuf;
      for(;;){
        pool->pop(&thisBuf};  // blocks if pool empty
        netBuf->datalen=network.read(&thisBuf.bigBuf,sizeof(thisBuf.bigBuf));
        diskQueue->push(thisBuf);
      };
    };
    void* diskThread.run{
      fileStream *myFile;
      diskBuf *thisBuf;
      new myFile("someFolderfileSpec",someEnumWrite);
      for(;;){
        diskQueue->pop(&thisBuf};  // blocks until buffer available
        myFile.write(&thisBuf.bigBuf,thisBuf.dataLen);
        pool->push(thisBuf};
      };
    };

最新更新