单个生产者和多个单线程消费者



我的应用程序从网络接收数据包,并将它们分派给一个或多个"处理器"。(每个数据包都属于一个预定义的"流",可以通过查看数据包数据来识别。)

目前只有一个线程可以完成所有的工作:

  1. 从网络设备获取数据包
  2. 识别每个数据包的处理器
  3. 将数据包调度到其处理器

以每秒2000万个数据包(10Gbps的60字节数据包)的速率接收传入数据。

然而,此解决方案只能满足极少数流和处理器的要求。例如,在10个流的情况下,已经有大约10-20%的数据包丢失。

由于步骤(3)是最昂贵的步骤,我计划将该工作委托给一个工作线程池。

但是,我必须小心,因为处理器本身不是线程安全的。因此,只有一个工作线程可以同时将数据包发送到同一处理器。

对于基于任务的编程来说,这似乎是一个很好的用例。但我无法轻易地将TBB文档中解释的设计模式与我的问题相匹配。

所以我的问题是:我如何组织我的使用者线程,使它们将数据包均匀地分配给单线程处理器?

我并不期待一个完全解决方案,但我会对你的建议或随机想法感到满意:)

我做了一些嵌入式编程,在那里我必须处理相对较高的吞吐量——没有这里那么快!希望你使用的硬件比我习惯的要强大得多。。。有一些简单的策略应该适用于你的情况!

1.输入/处理队列和相关的内存管理至关重要

如果数据速率较高,则传入数据的队列必须非常高效。您应该尽可能少地进行处理,否则可能会丢失设备中的数据。(我习惯于从某种缓冲区相对较小的快速串行设备中读取数据,因此在不丢失数据的情况下,设备可以保留多长时间是有实时限制的。这让我养成了将设备读取作为一项完全独立的任务来处理的习惯,只处理读取数据,而不处理其他事情。)

一系列非常简单的固定大小的预分配缓冲区的效率与它一样高:有一个"空闲"缓冲区队列和一个"已填充"缓冲区的队列。如果您使用无锁链表,那么维护这些列表可能非常快,并且入队/出队操作在许多操作系统中非常常见。

避免使用malloc或其他动态分配,因为当他们需要管理自己的"空闲"one_answers"已分配"块的数据结构时,会有显著的(通常是不可预测的)开销。如果生产者或工作线程同时释放或分配内存,它们还可能执行锁定,从而可能不可预测地阻塞生产者或工作进程线程。相反,尝试找到较低级别的例程,用于分配和释放操作系统为队列提供的整个页面(unixy平台上的mmap,VirtualAllocEx)。它们通常需要做的工作要少得多,因为它们使用MMU功能来映射RAM的物理页面,内存中没有复杂的数据结构需要维护,每次调用都有更可靠的运行时,并且可以足够快地扩展空闲列表(如果空闲列表不足)。

在生产商中,不要担心单位小于整块。从队列中取出一个空闲块,将一个数据块打包,然后将其添加到要处理的队列中。如果你必须确保每个数据包都在固定的时间段内处理,或者你需要处理"突发"数据率,那么仍然可以尝试从输入设备中读取完整的缓冲区,但可以将块的大小减少到"合理"的时间量,也可以使用超时将部分填充的块排队处理,并用某种空数据包"填充"剩余部分。我发现,这样做通常比必须包含大量代码来处理部分填充的缓冲区更快。

如果可以的话,请非常小心地设置生产者线程的处理器相关性和线程优先级。理想情况下,您希望生产者线程比任何消费者线程具有更高的优先级,并绑定到特定的核心。没有什么可以阻止在缓冲区空间不足的情况下读取传入数据。

2.处理

你说过有:

  1. 几个流
  2. 几个"处理器",它们不是线程安全的

在这里做的有用的事情是在数据包上并行运行处理器,但从你的问题中还不清楚这在多大程度上是可能的。

处理器在流之间是线程安全的吗?(我们能在两个不同的线程中运行一个处理器吗?只要它们在两种不同的流上运行?)

同一流中不同处理器之间的处理器是否线程安全?(我们可以在不同的线程中在同一个流上运行多个处理器吗?)

处理器是否需要按特定顺序运行?

在不知道这一点的情况下,仍然有一些通用的东西是有用的建议。

让第二个线程处理从生产者读取完整的缓冲区,并将它们调度到适当的处理器(在其他线程中),然后将完整的缓冲区时放回"空"队列中进行处理。虽然你失去了一些直线效率(一个线程进行读取和调度会比两个线程稍微"快"一点),但如果存在瞬时锁定,至少这种方式不会阻止输入设备的读取。

通过创建或查找库,可以将作业分配到线程池,特别是当与可以并行运行的线程数量相比,您有许多处理器时。还实现某种允许作业之间的一些简单关系的作业队列是相对简单的(例如"该作业要求首先完成作业X和Y","该作业不能与使用相同处理器的任何其他作业并行运行")。即使是作业管理器只在第一个可用线程上运行第一个可运行作业的简单策略也会非常有效。

尽量避免复制。如果处理器可以在不从缓冲区复制数据包的情况下"就地"处理数据包,那么您就节省了很多毫无意义的周期。即使必须进行复制,让多个线程从"只读"共享缓冲区复制数据也比让一个线程将消息复制并调度到多个线程要好。

如果检查处理器是否应该为给定的数据包运行非常快,那么你最好有几个作业,每个作业都检查它是否应该进行一些处理。与其让一个线程来确定哪些处理器应该在哪些数据包上运行,不如让多个线程来更快,每个处理器或处理器组一个线程,检查每个数据包是否应该运行它的处理器。这可以归结为这样一种想法,即在几个线程中对只读资源进行几次简单的检查可能比在线程之间进行同步花费更少的时间。

如果您可以并行运行处理器(如果它们正在处理来自不同流的数据),那么对数据进行传递以获得流的列表,然后为每个流启动一个作业是一个好主意。您也可以收集属于每个流的数据包列表,但同样,这是一个作业检查每个数据包的速度与在单个线程中收集该列表并将每个数据包传递给各自作业所需的时间之间的权衡。

希望这些策略中的一些对您的情况有用!让我们知道结果如何。。。你要处理的数据太多了,最好知道什么对比我习惯的更快的数据速率有效,什么无效!祝你好运

这是我对一个可能的解决方案的想法。

假设我们有n个处理器。让我们引入n个互斥,每个处理器一个。让我们还介绍一个数据包队列。所有传入的数据包都被放入这个队列中。

工作线程的操作方式如下:

  1. 从传入数据包队列中抓取数据包
  2. 确定必要的处理器
  3. 尝试获取相应的互斥。如果锁定获取成功,则处理数据包。否则,请重新排队并转到1
  4. 处理完成后,转到步骤1

可能的缺点:

  1. 数据包被重新排队,这意味着它们可能会被延迟/处理,这可能会破坏您的交易(不确定)
  2. 队列上的争用率可能很高。您可能想研究使用一个无锁队列来实现这一点
  3. 队列显然会消耗额外的内存,我不知道你是否有多余的内存

EDIT:更多关于内存消耗的想法-当然,可以对队列可以消耗的内存量设置上限-那么,问题是当内存用完时该怎么办。我想说,最好的办法就是开始丢弃数据包(我的印象是,在你的情况下,丢弃几个数据包并不是什么大不了的事),直到队列有点耗尽。

与此相关的是——我认为这个用例的一个好的队列实现应该不惜一切代价避免动态内存分配——预先分配内存,并确保在关键代码路径上没有分配。

为什么不能使用多个队列,每个处理器一个?这些队列可以是无锁的(没有互斥)。

  1. 从网络设备获取数据包
  2. 识别每个数据包的处理器(PID)
  3. 将数据包推送到队列
  4. 工作者:处理队列中的数据包[k]

对于类似的问题,我使用无锁环形缓冲区的轮询来自动覆盖最旧的数据包。

最新更新