add_done_callback的Python3异步回调不会更新服务器类中的自变量



我有两个服务器,使用asyncio.start_server创建:asyncio.start_server(self.handle_connection, host = host, port = port)并在一个循环中运行:

loop.run_until_complete(asyncio.gather(server1, server2))
loop.run_forever()

我正在使用asyncio。排队以在服务器之间进行通信。通过queue.put(msg)添加的来自Server2的消息由Server1中的queue.get()成功接收。我正在通过asyncio.ensure_future运行queue.get(),并使用作为的回调来自Server1:的add_done_callback方法

def callback(self, future):
msg = future.result()
self.msg = msg

但是这个callback没有按预期工作-self.msg没有更新。我做错了什么?

更新并添加额外代码以显示最大完整示例:

class Queue(object):
def __init__(self, loop, maxsize: int):
self.instance = asyncio.Queue(loop = loop, maxsize = maxsize)
async def put(self, data):
await self.instance.put(data)
async def get(self):
data = await self.instance.get()
self.instance.task_done()
return data
@staticmethod
def get_instance():
return Queue(loop = asyncio.get_event_loop(), maxsize = 10)

服务器类别:

class BaseServer(object):
def __init__(self, host, port):
self.instance = asyncio.start_server(self.handle_connection, host = host, port = port)
async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
pass
def get_instance(self):
return self.instance
@staticmethod
def create():
return BaseServer(None, None)

接下来我要运行服务器:

loop.run_until_complete(asyncio.gather(server1.get_instance(), server2.get_instance()))
loop.run_forever()

在服务器2的handle_connection中,我调用queue.put(msg),在服务器1的handle_connection中,我将queue.get()注册为任务:

task_queue = asyncio.ensure_future(queue.get())
task_queue.add_done_callback(self.process_queue)

server1:的process_queue方法

def process_queue(self, future):
msg = future.result()
self.msg = msg

server1:的handle_connection方法

async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
task_queue = asyncio.ensure_future(queue.get())
task_queue.add_done_callback(self.process_queue)
while self.msg != SPECIAL_VALUE:
# doing something

尽管task_queue已完成,但self.process_queue已调用,self.msg从不更新。

基本上,由于您使用异步结构,我认为您可以直接等待结果:

async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
msg = await queue.get()
process_queue(msg)  # change it to accept real value instead of a future.
# do something

相关内容

  • 没有找到相关文章

最新更新