我在许多配置中使用Python和C++中的ZeroMQ,我想知道从另一个线程中止recv()
或poll()
的最优雅方法是哪一种(例如,在受控程序终止的情况下,如果您想停止侦听而不需要终止套接字)。
与此问题相反,我不仅想避免无限等待,还想从recv()
或poll()
立即返回。
我知道我可以提供一个timeout
,然后像这样中止recv()
:
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
while _running:
if poller.poll(timeout=100) == []:
# maybe handle unwanted timout here..
continue
handle_message(socket.recv())
这将无休止地轮询套接字,直到_running
从另一个线程设置为False
——在我完成最多100毫秒之后。
但这并不好——我有一个繁忙的循环,用这种方式很难处理实际的超时,这可能是不需要的行为造成的。我也必须等待超时,这在大多数情况下并不重要,但。。你知道我的意思。
当然,我可以为堕胎投票:
abort_socket = context.socket(zmq.SUB)
abort_socket.setsockopt(zmq.SUBSCRIBE, b"")
abort_socket.connect(<abort-publisher-endpoint>)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
poller.register(abort_socket, zmq.POLLIN)
while _running:
poll_result = poller.poll(timeout=1000)
if socket in poll_result:
handle_message(socket.recv())
elif abort_socket in poll_result:
break
else:
# handle real timeout here
pass
但这种方法也有缺点:
- 这有点冗长-在我触发中止的地方,我必须创建一个发布者,并使用它来中止接收器
abort_socket
只能从一个线程使用,所以我必须确保
所以我的问题是:这是如何做到的?
我可以在其他语言中使用类似Python的threading.Event
或s.th.的东西,而不是像这样传递给轮询器的中止套接字吗?:
def listener_thread_fn(event)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
poller.register(event, zmq.POLLIN)
while _running:
poll_result = poller.poll(timeout=1000)
if socket in poll_result:
handle_message(socket.recv())
elif event in poll_result:
break
else:
# handle real timeout here
pass
因此,您只需要首先创建一个theading.Event()
,将其传递给listener_thread_fn
,然后从任何线程调用event.set()
即可中止。
使用Python和pyzmq
,recv()
或poll()
中断时会引发错误;因此,您可以简单地在异常发生时捕获它。以recv()
为例:
while True:
try:
request = server.recv()
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
print('Clean exit now!')
break
raise
您可以很容易地修改该代码以使用poll()
(过程相同)。此外,请注意,您需要:
import errno