如何在带有asyncio的pyzmq中使用REQ和REP



我正试图在python3.5中使用pyzmq和asyncio实现异步客户端和服务器。我使用了zmq提供的异步库。下面是我为客户端(requester.py(和服务器(responser.py(编写的代码。我的要求是只使用REQ和REP zmq套接字来实现异步客户端-服务器。

请求者.py:

import asyncio
import zmq
import zmq.asyncio
async def receive():
message = await socket.recv()
print("Received reply ", "[", message, "]")
return message
async def send(i):
print("Sending request ", i,"...")
request = "Hello:" + str(i)
await socket.send(request.encode('utf-8'))
print("sent:",i)
async def main_loop_num(i):
await send(i)
#  Get the reply.
message = await receive()
print("Message :", message)
async def main():
await asyncio.gather(*(main_loop_num(i) for i in range(1,10)))                
port = 5556
context = zmq.asyncio.Context.instance()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:%d" % port)
asyncio.get_event_loop().run_until_complete(asyncio.wait([main()]))

应答器.py:

import asyncio
import zmq
import zmq.asyncio
async def receive():
message = await socket.recv()
print("Received message:", message)
await asyncio.sleep(10)
print("Sleep complete")
return message
async def main_loop():
while True:
message = await receive()
print("back to main loop")
await socket.send(("World from %d" % port).encode('utf-8'))
print("sent back")
port = 5556
context = zmq.asyncio.Context.instance()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:%d" % port)
asyncio.get_event_loop().run_until_complete(asyncio.wait([main_loop()]))

我得到的输出是:

requester.py:

Sending request  5 ...
sent: 5
Sending request  6 ...
Sending request  1 ...
Sending request  7 ...
Sending request  2 ...
Sending request  8 ...
Sending request  3 ...
Sending request  9 ...
Sending request  4 ...

responser.py:

Received message: b'Hello:5'
Sleep complete
back to main loop
sent back

从输出中,我假设请求者已经发送了多个请求,但只有第一个请求到达了响应者。此外,响应者为第一个请求发送的响应甚至还没有返回到请求者。为什么会发生这种情况?我在所有可能的地方都使用了异步方法,但是send((和recv((方法并没有异步运行。是否可以在不使用任何其他套接字(如路由器、经销商等(的情况下制作异步req-rep?

ZMQ REQ-REP套接字需要一个严格的顺序,即一个请求-一个回复-一个请求–一个回复。。。

您的requester.py并行启动所有10个请求:

await asyncio.gather(*(main_loop_num(i) for i in range(1,10)))

当发送第二个请求时,ZMQ抱怨这一点:

zmq.error.ZMQError: Operation cannot be accomplished in current state

尝试更改您的主要功能,一次发送一个请求:

async def main():
for i in range(1, 10):
await main_loop_num(i)

如果需要并行发送多个请求,则不能使用REQ-REP套接字对,而是使用DEALER-REP套接字对。

最新更新