我有一个从服务器接收数据的asyncio.Protocol
子类。我将这些数据(每行,因为数据是文本)存储在asyncio.Queue
中。
import asyncio
q = asyncio.Queue()
class StreamProtocol(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
for message in data.decode().splitlines():
yield q.put(message.rstrip())
def connection_lost(self, exc):
self.loop.stop()
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: StreamProtocol(loop),
'127.0.0.1', '42')
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
我想让另一个协程负责使用队列中的数据并对其进行处理。
- 这应该是一个
asyncio.Task
吗? - 如果队列因为几秒钟内没有收到数据而变为空,该怎么办?如何确保我的消费者不会停止(
run_until_complete
)? - 有没有比为我的队列使用全局变量更干净的方法?
这应该是一个异步。任务?
是的,使用 asyncio.ensure_future 或loop.create_task创建它。
如果队列因为几秒钟内没有收到数据而变为空,该怎么办?
只需使用 queue.get 等待项目可用:
async def consume(queue):
while True:
item = await queue.get()
print(item)
有没有比为我的队列使用全局变量更干净的方法?
是的,只需将其作为参数传递给使用者协程和流协议:
class StreamProtocol(asyncio.Protocol):
def __init__(self, loop, queue):
self.loop = loop
self.queue = queue
def data_received(self, data):
for message in data.decode().splitlines():
self.queue.put_nowait(message.rstrip())
def connection_lost(self, exc):
self.loop.stop()
如何确保我的消费者不会停止 (run_until_complete)?
关闭连接后,使用 queue.join 等待队列为空。
完整示例:
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
# Connection coroutine
factory = lambda: StreamProtocol(loop, queue)
connection = loop.create_connection(factory, '127.0.0.1', '42')
# Consumer task
consumer = asyncio.ensure_future(consume(queue))
# Set up connection
loop.run_until_complete(connection)
# Wait until the connection is closed
loop.run_forever()
# Wait until the queue is empty
loop.run_until_complete(queue.join())
# Cancel the consumer
consumer.cancel()
# Let the consumer terminate
loop.run_until_complete(consumer)
# Close the loop
loop.close()
或者,您也可以使用流:
async def tcp_client(host, port, loop=None):
reader, writer = await asyncio.open_connection(host, port, loop=loop)
async for line in reader:
print(line.rstrip())
writer.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_client('127.0.0.1', 42, loop))
loop.close()