pyzmq socket.recv() with NOBLOCK 标志行为



我使用pyzmq创建了简单的客户端/服务器。

我不确定的一件事是,即使消息是从服务器发送的,.recv()也不会收到消息。它只是忽略它并抛出一个我觉得很奇怪的错误。

Client.py

try:
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:2222")
print("Sending request")
socket.send(b"send the message")
message = socket.recv(flags=zmq.NOBLOCK)
print("Received reply %s " % message)
except Exception as e:
print(str(e))

Server.py

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:2222")
while True:
message = socket.recv()
socket.send(b"Ack")

我认为客户端应该收到Ack并打印它,而不是抛出异常。

该文件说,

使用flags=NOBLOCK,如果没有消息到达,则会引发ZMQError

显然,服务器在收到消息后立即响应"Ack"。

错误消息是,

资源暂时不可用

请记住,在并发环境中,无法保证独立进程的执行顺序。即使您立即响应server.py中的消息,响应也可能在您呼叫socket.recv之前到达接收套接字。当您调用socket.send消息需要通过网络发送到服务器时,服务器需要创建消息并响应,然后消息需要通过网络返回到客户端代码。通过网络发送消息的时间会很长,您在socket.send后立即呼叫socket.recv

因此,实际上,当您调用message = socket.recv(flags=zmq.NOBLOCK)时,客户端socket尚未从服务器收到Ack,并且由于您正在使用NOBLOCK因此会抛出错误,因为socket上没有收到任何消息。

在这种情况下,NOBLOCK可能不合适。您可以通过在sendrecv之间添加sleep调用来试验这一点,以表明等待服务器响应的时间延迟确实是问题所在,但对于您的客户端代码来说,这不是一个好的解决方案。

如果要在等待一定时间后退出,则应改用socket.poll

event = socket.poll(timeout=3000)  # wait 3 seconds
if event == 0:
# timeout reached before any events were queued
pass
else:
# events queued within our time limit
msg = socket.recv()

Pyzmq Doc forsocket.poll()

假设服务器未启动,在这种情况下,客户端中的recv()将被永久阻止,这是我不想要的。

ZeroMQ是在分布式系统中执行智能信令/消息传递的绝佳框架

让我们勾勒出一个主要非阻塞操作方法的演示,其中包含一些关于如何在进程终止之前获取和优雅释放资源的灵感。

也许在不到五秒钟的时间内阅读一些关于 ZeroMQ 层次结构中主要概念差异的内容也会有所帮助。

Server.py

aContext     = zmq.Context()
aLightHouse  =    aContext.socket( zmq.PUB )
aRepSocket   =    aContext.socket( zmq.REP )
aRepSocket.setsockopt(             zmq.LINGER,   0 )
aRepSocket.setsockopt(             zmq.COMPLETE, 1 )
aRepSocket.bind(                  "tcp://*:2222" )
aLightHouse.bind(                 "tcp://*:3333" )
aLightHouse.setsockopt(            zmq.LINGER,   0 )
aLightHouse.setsockopt(            zmq.CONFLATE, 1 )
aLightHouse_counter = 0
#------------------------------------------------------------
print( "INF: Server InS: ZeroMQ({0:}) going RTO:".format( zmq.zmq_version() )  )
#------------------------------------------------------------
while True:
try:
aLightHouse_counter += 1
aLightHouse.send( "INF: server-RTO blink {0:}".format( repr( aLightHouse_counter ) ),
zmq.NOBLOCK
)
if ( 0 < aRepSocket.poll( 0, zmq.POLLIN ) ):
try:
message = aRepSocket.recv(         zmq.NOBLOCK ); print( "INF: .recv()ed {0:}".format( message ) )
pass;     aRepSocket.send( b"Ack", zmq.NOBLOCK ); print( "INF: .sent() ACK" )
except:
# handle EXC: based on ...
print(  "EXC: reported as Errno == {0:}".format( zmq.zmq_errno() ) )
else:
# NOP / Sleep / do other system work-units to get processed during the infinite-loop
except:
# handle EXC:
print(  "EXC: will break ... and terminate OoS ..." )
break
#------------------------------------------------------------
print( "INF: will soft-SIG Server going-OoS..." )
aLightHouse.send(   "INF: server goes OoS ... " )
#------------------------------------------------------------
print( "INF: will .close() and .term() resources on clean & graceful exit..." )
Sleep( 0.987654321 )
aRepSocket.unbind(  "tcp://*:2222" )
aRepSocket.close()
aLightHouse.unbind( "tcp://*:3333" )
aLightHouse.close()
aContext.term()
#------------------------------------------------------------
print( "INF: over and out" )

Client.py

try:
aContext   = zmq.Context()
aReqSocket =    aContext.socket( zmq.REQ )
aBeeper    =    aContext.socket( zmq.SUB )
aReqSocket.setsockopt(           zmq.LINGER,   0 )
aReqSocket.setsockopt(           zmq.COMPLETE, 1 )
aReqSocket.connect(             "tcp://localhost:2222" )
aBeeper.connect(                "tcp://localhost:3333" )
aBeeper.setsockopt(              zmq.SUBSCRIBE, "" )
aBeeper.setsockopt(              zmq.CONFLATE, 1 )
#------------------------------------------------------------
print( "INF: Client InS: ZeroMQ({0:}) going RTO.".format( zmq.zmq_version() )  )
#------------------------------------------------------------
try:
while True:
if ( 0 == aBeeper.poll( 1234 ) ):
print( "INF: Server OoS or no beep visible within a LoS for the last 1234 [ms] ... " )
else:
print( "INF: Server InS-beep[{0:}]".format( aBeeper.recv( zmq.NOBLOCK ) ) )
try:
print( "INF: Going to sending a request" )
aReqSocket.send( b"send the message", zmq.NOBLOCK )
print( "INF: Sent. Going to poll for a response to arrive..." )
while ( 0 == aReqSocket.poll( 123, zmq.POLLIN ) ):
print( "INF:  .poll( 123 ) = 0, will wait longer ... " )
message = socket.recv( flags = zmq.NOBLOCK )
print( "INF: Received a reply %s " % message )

except Exception as e:
print( "EXC: {0:}".format( str( e ) ) )
print( "INF: ZeroMQ Errno == {0:}".format( zmq.zmq_errno() ) )
print( "INF: will break and terminate" )
break
except Exception as e:
print( "EXC: {0:}".format( str( e ) ) )
finally:
#------------------------------------------------------------
print( "INF: will .close() and .term() resources on clean & graceful exit..." )
aBeeper.close()
aReqSocket.close()
aContext.term()
#------------------------------------------------------------
print( "INF: over and out" )

您使用的是非阻塞模式,这意味着它会引发错误通知您,该消息无法执行任何操作,您应该稍后重试,但是如果您使用的是阻塞模式,它会阻止直到对等方连接。

这个答案来自这里

基本上,如果您删除flags=zmq.NOBLOCK它将起作用。

更新

如果你想使用非阻塞模式,你应该看看这个

最新更新