处理大量传入字节数组流的最佳缓冲区架构



我正在寻求如何最好地构建缓冲区结构的建议,该缓冲区结构可以处理大量的传入数据,这些数据的处理速度比传入数据慢。

我编程了一个定制的二进制读取器,它可以在单个线程上每秒流式传输多达1200万个字节数组,并希望在同一台机器和不同的线程上以单独的结构处理字节数组流。问题是,消费结构无法跟上生产者的传入数据量,因此我认为我需要某种缓冲区来正确处理这一问题。我最感兴趣的是关于整体架构的建议,而不是代码示例。我的目标是.Net 4.0。以下是我当前设置和要求的更多信息。

Producer:在专用线程上运行,从物理存储介质(SSD、OCZ Vertex 3 Max IOPS)上的文件中读取字节阵列。吞吐量大约为每秒1200万字节阵列。每个数组只有16字节大小。完全实现

使用者:应该在一个独立于生产者的线程上运行。使用字节数组,但在处理数据之前必须解析为几个基元数据类型,因此处理速度明显慢于生产者的发布速度。消费者结构得到充分实施。

介于两者之间:希望建立一个缓冲结构,为生产者提供发布,为消费者提供消费。未实施。

如果你们中的一些人能从自己的经验或专业知识中评论一下,为了处理这种结构,最好考虑什么,我会很高兴。缓冲区是否应该实现一种节流算法,只在缓冲区/队列为半空时向生产者请求新数据?如何处理锁定和阻塞?很抱歉,我在这个领域的经验非常有限,到目前为止,我已经通过消息总线的实现来处理它,但我所研究的任何消息总线技术都肯定无法处理我所寻求的吞吐量。欢迎任何评论!!!

编辑:别提了,数据只由一个消费者消费。数组发布的顺序也很重要;订单需要保留,以便消费者按照相同的顺序消费。

16字节(称为16B)太小,无法进行高效的线程间通信。将如此小的缓冲区排队将导致在线程间通信上花费的CPU比在实际有用的数据处理上花费的更多。

所以,把它们堆起来。

声明某个缓冲区类(比如C16B),它包含一个漂亮的、大的16B数组(至少值4K),以及一个"count"int来显示加载了多少(从文件中加载的最后一个缓冲区可能没有满)。如果你把一个缓存行大小的空字节数组放在这个16B数组前面,这将有所帮助-有助于避免错误共享。你可以把处理16B的代码作为一个方法,"Process16B",sya,也许还有加载数组的代码-以文件描述符为参数。这个类现在可以有效地加载到队列中的其他线程。

您需要一个生产者-消费者队列类-C#在BlockingCollection类中已经有了一个。

您需要在此应用程序中进行流量控制。我会通过创建一个C16B的池来实现这一点——创建一个阻塞队列,并在循环中创建/添加一大堆C16B。1024是个不错的整数。现在您有了一个"池队列",它提供流控制,避免了新建()任何C16B,也不需要不断地对它们进行垃圾收集。

一旦你有了这个,剩下的就很容易了。在加载器线程中,不断地从池队列中将C16B从队列中移出,将它们与文件中的数据一起加载,并将它们添加到"16Bprocess"阻塞队列上的处理线程中。在处理线程中,从16B进程队列中获取(),并通过调用其Process16B方法来处理每个C16B实例。处理完16B后,将C16B()添加回池队列以供重用。

通过池队列回收C16B提供端到端的流控制。如果生产者是最快的链路,则池最终将为空,生产者将在那里阻塞,直到消费者返回一些C16B。

如果处理需要这么多时间,那么如果有可用的空闲内核,您可以随时添加另一个处理线程。这种方案的问题是数据处理会出现问题。这可能很重要,也可能无关紧要。如果是这样,数据流稍后可能需要"整理",例如使用序列号和缓冲区列表。

我建议将池队列计数(可能还有16B进程队列计数)转储到带有计时器的状态组件或命令行。这提供了C16B实例所在位置的有用快照,您可以在没有第三方工具的情况下看到瓶颈和任何C16B泄漏(这些工具会将整个应用程序拖慢到爬行状态,并在关闭时发布虚假的泄漏报告)。

您可以使用BlockingCollection,只要消费者没有消费足够的商品,它就会阻止生产者向收藏中添加商品。

还有其他并发收集类,例如ConcurrentQueue

IMO某种阻塞队列可以解决您的问题。从本质上讲,如果队列没有更多容量,Producer线程将阻塞。看看这个创建阻塞队列<T>在.NET中?

为什么要使用缓冲区?使用磁盘文件作为缓冲区。当消费者开始处理一个字节数组时,让读取器读取下一个,就这样了

编辑:在要求消费者和生产者脱钩之后。

您可以有一个协调器,它告诉生产者生产X字节数组,并将X字节数组提供给消费者。三个部分的作用如下:

协调器告诉生产者生成X字节数组。Producer生产X字节阵列

现在循环执行此操作:协调员告诉消费者到消费者X字节数组协调员告诉生产者生产X字节数组消费者告诉协调者已经完成消费循环直到不再有字节数组

生产者和协调器可以在同一个线程中运行。消费者应该有自己的思路。

您将几乎没有锁定(我认为您可以在完全没有锁定的情况下做到这一点,只需消费者使用一个等待句柄来通知协调器它已经完成),并且您的协调器非常简单。

REEDIT:另一个真正的解耦选项

使用ZeroMQ处理通信。生产者读取字节数组,并将每个数组发布到ZeroMQ套接字。使用者从ZeroMQ套接字读取数组。

ZeroMQ非常高效和快速,并在内部处理所有技术细节(线程同步、缓冲等)。在同一台计算机上使用时,您也不会遭受任何数据丢失(在两台不同的计算机上使用UDP时可能会发生这种情况)。

相关内容

  • 没有找到相关文章

最新更新