立即从另一个线程中止zeromq-recv()或poll(),而无需等待超时



我在许多配置中使用Python和C++中的ZeroMQ,我想知道从另一个线程中止recv()poll()的最优雅方法是哪一种(例如,在受控程序终止的情况下,如果您想停止侦听而不需要终止套接字)。

与此问题相反,我不仅想避免无限等待,还想从recv()poll()立即返回

我知道我可以提供一个timeout,然后像这样中止recv()

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
while _running:
    if poller.poll(timeout=100) == []:
        # maybe handle unwanted timout here..
        continue
    handle_message(socket.recv())

这将无休止地轮询套接字,直到_running从另一个线程设置为False——在我完成最多100毫秒之后。

但这并不好——我有一个繁忙的循环,用这种方式很难处理实际的超时,这可能是不需要的行为造成的。我也必须等待超时,这在大多数情况下并不重要,但。。你知道我的意思。

当然,我可以为堕胎投票:

abort_socket = context.socket(zmq.SUB)
abort_socket.setsockopt(zmq.SUBSCRIBE, b"")
abort_socket.connect(<abort-publisher-endpoint>)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
poller.register(abort_socket, zmq.POLLIN)
while _running:
    poll_result = poller.poll(timeout=1000)
    if socket in poll_result:
        handle_message(socket.recv())
    elif abort_socket in poll_result:
        break
    else:
        # handle real timeout here
        pass

但这种方法也有缺点:

  • 这有点冗长-在我触发中止的地方,我必须创建一个发布者,并使用它来中止接收器
  • abort_socket只能从一个线程使用,所以我必须确保

所以我的问题是:这是如何做到的?

我可以在其他语言中使用类似Python的threading.Event或s.th.的东西,而不是像这样传递给轮询器的中止套接字吗?:

def listener_thread_fn(event)
    poller = zmq.Poller()
    poller.register(socket, zmq.POLLIN)
    poller.register(event, zmq.POLLIN)
    while _running:
        poll_result = poller.poll(timeout=1000)
        if socket in poll_result:
            handle_message(socket.recv())
        elif event in poll_result:
            break
        else:
            # handle real timeout here
            pass

因此,您只需要首先创建一个theading.Event(),将其传递给listener_thread_fn,然后从任何线程调用event.set()即可中止。

使用Python和pyzmqrecv()poll()中断时会引发错误;因此,您可以简单地在异常发生时捕获它。以recv()为例:

while True:
    try:
        request = server.recv()
    except zmq.ZMQError as e:
        if e.errno == errno.EINTR:
            print('Clean exit now!')
            break
        raise

您可以很容易地修改该代码以使用poll()(过程相同)。此外,请注意,您需要:

import errno

最新更新