我使用pyzmq创建了简单的客户端/服务器。
我不确定的一件事是,即使消息是从服务器发送的,.recv()
也不会收到消息。它只是忽略它并抛出一个我觉得很奇怪的错误。
Client.py
try:
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:2222")
print("Sending request")
socket.send(b"send the message")
message = socket.recv(flags=zmq.NOBLOCK)
print("Received reply %s " % message)
except Exception as e:
print(str(e))
Server.py
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:2222")
while True:
message = socket.recv()
socket.send(b"Ack")
我认为客户端应该收到Ack并打印它,而不是抛出异常。
该文件说,
使用
flags=NOBLOCK
,如果没有消息到达,则会引发ZMQError
显然,服务器在收到消息后立即响应"Ack"。
错误消息是,
资源暂时不可用
请记住,在并发环境中,无法保证独立进程的执行顺序。即使您立即响应server.py
中的消息,响应也可能在您呼叫socket.recv
之前到达接收套接字。当您调用socket.send
消息需要通过网络发送到服务器时,服务器需要创建消息并响应,然后消息需要通过网络返回到客户端代码。通过网络发送消息的时间会很长,您在socket.send
后立即呼叫socket.recv
。
因此,实际上,当您调用message = socket.recv(flags=zmq.NOBLOCK)
时,客户端socket
尚未从服务器收到Ack
,并且由于您正在使用NOBLOCK
因此会抛出错误,因为socket
上没有收到任何消息。
在这种情况下,NOBLOCK
可能不合适。您可以通过在send
和recv
之间添加sleep
调用来试验这一点,以表明等待服务器响应的时间延迟确实是问题所在,但对于您的客户端代码来说,这不是一个好的解决方案。
如果要在等待一定时间后退出,则应改用socket.poll
。
event = socket.poll(timeout=3000) # wait 3 seconds
if event == 0:
# timeout reached before any events were queued
pass
else:
# events queued within our time limit
msg = socket.recv()
Pyzmq Doc forsocket.poll()
问:假设服务器未启动,在这种情况下,客户端中的
recv()
将被永久阻止,这是我不想要的。
ZeroMQ是在分布式系统中执行智能信令/消息传递的绝佳框架
让我们勾勒出一个主要非阻塞操作方法的演示,其中包含一些关于如何在进程终止之前获取和优雅释放资源的灵感。
也许在不到五秒钟的时间内阅读一些关于 ZeroMQ 层次结构中主要概念差异的内容也会有所帮助。
Server.py
aContext = zmq.Context()
aLightHouse = aContext.socket( zmq.PUB )
aRepSocket = aContext.socket( zmq.REP )
aRepSocket.setsockopt( zmq.LINGER, 0 )
aRepSocket.setsockopt( zmq.COMPLETE, 1 )
aRepSocket.bind( "tcp://*:2222" )
aLightHouse.bind( "tcp://*:3333" )
aLightHouse.setsockopt( zmq.LINGER, 0 )
aLightHouse.setsockopt( zmq.CONFLATE, 1 )
aLightHouse_counter = 0
#------------------------------------------------------------
print( "INF: Server InS: ZeroMQ({0:}) going RTO:".format( zmq.zmq_version() ) )
#------------------------------------------------------------
while True:
try:
aLightHouse_counter += 1
aLightHouse.send( "INF: server-RTO blink {0:}".format( repr( aLightHouse_counter ) ),
zmq.NOBLOCK
)
if ( 0 < aRepSocket.poll( 0, zmq.POLLIN ) ):
try:
message = aRepSocket.recv( zmq.NOBLOCK ); print( "INF: .recv()ed {0:}".format( message ) )
pass; aRepSocket.send( b"Ack", zmq.NOBLOCK ); print( "INF: .sent() ACK" )
except:
# handle EXC: based on ...
print( "EXC: reported as Errno == {0:}".format( zmq.zmq_errno() ) )
else:
# NOP / Sleep / do other system work-units to get processed during the infinite-loop
except:
# handle EXC:
print( "EXC: will break ... and terminate OoS ..." )
break
#------------------------------------------------------------
print( "INF: will soft-SIG Server going-OoS..." )
aLightHouse.send( "INF: server goes OoS ... " )
#------------------------------------------------------------
print( "INF: will .close() and .term() resources on clean & graceful exit..." )
Sleep( 0.987654321 )
aRepSocket.unbind( "tcp://*:2222" )
aRepSocket.close()
aLightHouse.unbind( "tcp://*:3333" )
aLightHouse.close()
aContext.term()
#------------------------------------------------------------
print( "INF: over and out" )
Client.py
try:
aContext = zmq.Context()
aReqSocket = aContext.socket( zmq.REQ )
aBeeper = aContext.socket( zmq.SUB )
aReqSocket.setsockopt( zmq.LINGER, 0 )
aReqSocket.setsockopt( zmq.COMPLETE, 1 )
aReqSocket.connect( "tcp://localhost:2222" )
aBeeper.connect( "tcp://localhost:3333" )
aBeeper.setsockopt( zmq.SUBSCRIBE, "" )
aBeeper.setsockopt( zmq.CONFLATE, 1 )
#------------------------------------------------------------
print( "INF: Client InS: ZeroMQ({0:}) going RTO.".format( zmq.zmq_version() ) )
#------------------------------------------------------------
try:
while True:
if ( 0 == aBeeper.poll( 1234 ) ):
print( "INF: Server OoS or no beep visible within a LoS for the last 1234 [ms] ... " )
else:
print( "INF: Server InS-beep[{0:}]".format( aBeeper.recv( zmq.NOBLOCK ) ) )
try:
print( "INF: Going to sending a request" )
aReqSocket.send( b"send the message", zmq.NOBLOCK )
print( "INF: Sent. Going to poll for a response to arrive..." )
while ( 0 == aReqSocket.poll( 123, zmq.POLLIN ) ):
print( "INF: .poll( 123 ) = 0, will wait longer ... " )
message = socket.recv( flags = zmq.NOBLOCK )
print( "INF: Received a reply %s " % message )
except Exception as e:
print( "EXC: {0:}".format( str( e ) ) )
print( "INF: ZeroMQ Errno == {0:}".format( zmq.zmq_errno() ) )
print( "INF: will break and terminate" )
break
except Exception as e:
print( "EXC: {0:}".format( str( e ) ) )
finally:
#------------------------------------------------------------
print( "INF: will .close() and .term() resources on clean & graceful exit..." )
aBeeper.close()
aReqSocket.close()
aContext.term()
#------------------------------------------------------------
print( "INF: over and out" )
您使用的是非阻塞模式,这意味着它会引发错误通知您,该消息无法执行任何操作,您应该稍后重试,但是如果您使用的是阻塞模式,它会阻止直到对等方连接。
这个答案来自这里
基本上,如果您删除flags=zmq.NOBLOCK
它将起作用。
更新
如果你想使用非阻塞模式,你应该看看这个