PyzMQ 代理在订阅多个进程后处于奇怪的状态



我在pyzmq中的代理上遇到了一个奇怪的问题。这是该代理的代码:

import zmq
context = zmq.Context.instance()
frontend_socket = context.socket(zmq.XSUB)
frontend_socket.bind("tcp://0.0.0.0:%s" % sub_port)
backend_socket = context.socket(zmq.XPUB)
backend_socket.bind("tcp://0.0.0.0:%s" % pub_port)
zmq.proxy(frontend_socket, backend_socket)

我正在使用该代理在 50 台不同机器上运行的 ~6 个进程之间发送消息。主题总量约为 1,000,但由于多个进程可以侦听相同的主题,因此订阅总量约为 10,000。

在正常情况下,这非常有效,只要一个进程发布它并且至少有一个其他进程订阅了该主题,消息就会正确通过代理。无论发布服务器还是订阅服务器首先启动,它都有效。

但是在某个时间点,当我们开始一个新过程(我们称之为X(时,它开始表现得很奇怪。已经连接的所有内容都可以正常工作,但是我们连接的新进程只有在发布者之前连接时才能通过消息。X可以是正常工作的任何一个进程,它可以来自任何机器,结果是相同的。当我们进入这种状态时,杀死 X 会使一切再次工作,再次启动它会失败。如果我们停止其他进程然后启动 X,它运行良好(因此它与 X 的代码无关(。

我不确定我们是否可以达到ZMQ的某个极限?我读过一些例子,他们似乎拥有比我们更多的流程、订阅等。这可能是我们应该在代理上设置的一些选项,到目前为止,以下是我们尝试过但没有成功的选项:

  • 更改 frontend_socket 上的 RCVHWM
  • 更改 SNDHWM backend_socket
  • 在backend_socket上设置XPUB_VERBOSE
  • 在backend_socket上设置XPUB_VERBOSER

以下是我们如何将消息发布到代理的示例代码:

topic = "test"
message = {"test": "test"}
context = zmq.Context.instance()
socket = context.socket(zmq.PUB)
socket.connect("tcp://1.2.3.4:1234")
while True:
time.sleep(1)
socket.send_multipart([topic.encode(), json.dumps(message).encode()])

以下是我们如何订阅来自代理的消息的示例代码:

topic = "test"
context = zmq.Context.instance()
socket = context.socket(zmq.SUB)
socket.connect("tcp://1.2.3.4:5678")
socket.subscribe(topic)
while True:
multi_part = socket.recv_multipart()
[topic, message] = multi_part
print(topic.decode(), message.decode())

有没有人见过类似的问题?我们可以做些什么来避免代理进入这种状态?

谢谢!

将所有发布者(代理和发布过程(设为 XPUB( + sockopt verbose/verboser(,然后在轮询循环中从发布者套接字读取。订阅消息的第一个字节将告诉您消息是否为 sub/unsub,后跟主题/主题。如果您使用时间戳记录所有这些信息,它应该告诉您哪个组件出现故障(可能是三个组件中的任何一个(并帮助修复。

到达发布者 (XPUB( 的订阅消息的格式将为

  • 订阅[0x01][topic]
  • 取消订阅[0x00][topic]

所需代码

我通常研究C++但这是python的一般想法

代理

您需要创建一个捕获套接字(这就像一个网络分流器(。通过 inproc 将ZMQ_PAIR套接字连接到代理(捕获(,然后读取套接字另一端的内容。当您使用XPUB/XSUB时,您将看到订阅消息。

zmq.proxy(frontend, backend, capture)

阅读 Python 代理的文档/示例。

发行人

在这种情况下,您需要从发布套接字读取与其发送的同一线程中的发布套接字。这就是我说投票循环可能是最好的原因。

此代码根本不经过测试。

topic = "test"
message = {"test": "test"}
context = zmq.Context.instance()
socket = context.socket(zmq.XPUB)
socket.connect("tcp://1.2.3.4:1234")
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
timeout = 1000  #ms
while True:
socks = dict(poller.poll(timeout))
if not socks : # 1
socket.send_multipart([topic.encode(), json.dumps(message).encode()])
if socket in socks:
sub_msg = socket.recv()  
# print out the message here.

最新更新