ZeroMQ:每种数据类型的套接字或只有一个套接字



我有一个程序,它从大约10个其他(传感器读取)程序(全部由我自己控制)接收信息。我现在想让他们使用 ZeroMQ 进行通信。

对于大多数队列来说,重要的是中央接收程序始终具有最新的传感器数据,所有较旧的消息都不再重要。如果丢失了几条消息,我不在乎。因此,对于所有这些,我都从单独的PUB/SUB插座开始;每个程序一个。但我不确定这是否是正确的方法。据我了解,我有两个选择:

  1. 为每个程序制作一个单独的套接字,并在循环中读出它们。这样我就通过套接字知道我正在接收的信息(我通常只是发送int)。
  2. 制作一个所有程序都连接到的套接字,并且每条消息我都会发送一个string,告诉接收端消息是关于什么的。

所有连接都是PUB/SUB的,因此创建一个套接字会很好。我只是不确定这是否是最有效的方法。

欢迎所有提示!

-PUB/SUB很好,并且允许从 N 传感器:1-记录器轻松转换为 N-传感器:2+-记录器
-人们也可能受益于插座与接入端口的概念分离,其中可能连接多个插座

如何始终获得 只是实际的(最后一个)传感器读数:

如果由于系统集成的限制,没有绑定到一些早期的 ZeroMQ API,那么通过.setsockopt( ZMQ_CONFLATE, True )方法有一个可爱的功能

ZMQ_CONFLATE:仅保留最后一条消息

如果设置,套接字应在其入站/出站队列中仅保留一条消息,此消息是接收的最后一条消息/要发送的最后一条消息。
忽略ZMQ_RCVHWMZMQ_SNDHWM选项。不支持多部分消息,特别是只有一部分消息保存在套接字内部队列中。


关于设计困境:

除非实时控制稳定性引入了一些硬性实时限制,否则PUB端可以自由决定指示新值.send()SUB(-s) 的频率。这里不需要魔法,在内部传出队列上设置ZMQ_CONFLATE选项更少。

SUB(-s) 侧接收器也将受益于在管理的内部传入队列上设置的ZMQ_CONFLATE选项,但给定一组单独的.bind()-s 实例化单独的着陆端口以提供不同的单个传感器读数,您的"最后"值将始终保持"最后"读数。如果所有读数都进入一个共同的着陆垫,您的接收过程将被屏蔽(丢失)所有读数,但只是在.recv()发生之前意外的"最后一个"读数,这不会有太大帮助,不是吗?

如果需要进行一些与 I/O 性能相关的调整,.Context( n_IO_threads )+ZMQ_AFFINITY映射选项可能会增加并优先处理ioDataPump可能利用的资源来提高 IO 性能

除非您遇到严格的实时要求,否则拥有不必要的套接字没有多大意义。ZMQ 的公平排队应该注意对每个传感器程序给予同等的关注(参见指南中的图 6)

如果传感器程序位于通过以太网连接的其他设备上,则程序的最终性能受计算机中以太网 NIC 带宽的限制。处理单个 PULL 套接字的单线程程序很有可能能够比通过 NIC 更快地处理传入的数据。

如果是这样,那么您也可以坚持使用单个套接字并享受更简单的代码。处理多个套接字并不难,但处理一个套接字要容易得多。例如,使用单个插座,您不必告诉每个传感器程序要连接到哪个网络端口 - 它可以是一个常量。

对于您的情况,PUSH/PULL 听起来比 PUB/SUB 更自然,但这不会有太大区别。

持久性

持久性将是你的(潜在)问题。像ZMQ这样的东西的全部意义在于,它们将按照发送的顺序传递消息。因此,您阅读了一条消息,根据定义,就收件人而言,它是"最后"消息。收件人不知道途中是否有另一条消息在传输过程中。

这是Actor模型架构的一个特性(这就是ZMQ)。消息在传输中得到缓冲,并且在读取消息时没有关于消息新的信息。您所知道的是,它是提前一段时间发送的。没有与发送方的执行会合。

现在,您要么像处理最后一条消息一样处理它,要么等待一段时间,看看是否有另一条消息出现,然后再处理它。最简单的方法是简单地处理每条消息,就好像它是最后一条消息一样。

将其与通信顺序流程体系结构进行对比。它与 Actor 模型体系结构基本相同,只是传输不缓冲消息。消息发送阻止,直到收件人调用消息读取。

因此,当您阅读邮件时,收件人知道它是发件人发送的最后一封邮件。并且发件人知道它发送的消息已经在那一刻被收件人收到。所以最后性的知识是绝对的——收到的消息确实是发送的最后一个。

但是,除非你有一些相当重量级的事情发生,否则我不会担心它。您很可能能够跟上传感器数据流,即使您正在阅读的消息不是队列中的最新消息。

通过将发送端套接字上的高水限制设置为 1,您几乎可以将 ZMQ 变成 CSP。这意味着您最多可以缓冲 1 条消息。这与 0 不同,不幸的是,将 HWM 设置为 0 意味着"无限大小的缓冲区"。

最新更新