如何使用PUB/SUB防止缓冲/延迟



我将视频作为一系列图像(相当于zmq消息(发送,但有时,也许当网络速度较慢时,它们的接收速度比发送速度慢,并且出现越来越长的延迟,似乎高达大约一分钟的视频或100秒的图像或兆字节的数据。它通常最终通过订阅者以比发布者更快的速度接收消息来清除自身。

相反,我希望它像在订阅者发送recv太慢的情况下那样丢弃丢失的消息。我希望zmq.CONFLATE=1能做到这一点,但事实并非如此。那怎么办?我怀疑它们是在发布服务器上缓冲的,而发布服务器不应该有任何zmq缓冲区,或者以某种方式在网络堆栈中。

简化的服务器代码

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:12345")
camera = PiCamera()
stream = io.BytesIO()
for _ in camera.capture_continuous(stream, 'jpeg', use_video_port=True):
stream.truncate()
stream.seek(0)
socket.send(stream.read())
stream.seek(0)

简化的客户端代码

# Initialization
self.context = zmq.Context()
self.video_socket = self.context.socket(zmq.SUB)
self.video_socket.setsockopt(zmq.CONFLATE, 1)
self.video_socket.setsockopt(zmq.SUBSCRIBE, b"")
self.video_socket.connect("tcp://" + ip_address + ":12345")
def get_image(self):
# Receive the latest image
poll_result = self.video_socket.poll(timeout=0)
if poll_result == zmq.POLLIN:
return self.video_socket.recv()
else:
return None

发布者使用的是树莓派,订阅者使用的是Windows。

我不确定您使用的是哪种版本的python-zmq,但基于底层的c++libzmq,您需要:

  • 在服务器套接字上设置ZMQ_SNDHWM套接字选项
  • 在客户端套接字上设置ZMQ_RCVHWM套接字选项

在pub/sub的情况下,这些选项限制了每个完成连接要排队的消息数。如果队列变得大于HWM(高水位线(,则消息将被丢弃。

同时关闭合并,因为这会干扰这些选项。

还可以在服务器上设置zmq.CONFLATE=1,以便只保留发送队列中的最新消息。

绑定服务器套接字之前

socket.setsockopt(zmq.CONFLATE, 1)

出于某种原因,我错误地认为PUB套接字没有发送队列,但它确实有。

最新更新