Futures:从不同线程设置结果



我有一个http服务器,它在线程中接收一些请求。这个http服务器解析接收到的数据,并将部分数据发送到另一个进程。

来自该进程的响应可能需要一些时间,并在不同线程的回调中接收。

这里的问题是,我需要回调函数中收到的部分响应来响应我收到的初始请求,所以我的想法是使用异步。此的任务

我的代码如下:

my_futures = dict()
class HTTPHandler(http.server.BaseHTTPRequestHandle):
do_POST(self):
#parse data
asyncio.create_task(self.send_response())
async def send_response(self):
global my_futures
my_futures[uuid] = asyncio.get_event_loop().create_future()
send_data_to_processB()
await my_future
self.send_response()
# Callback where I receive the result from process B
def on_message(message):
global my_futures
my_futures[message['uuid']].set_result(message['result'])
if __name__ == '__main__':
server = http.server.HTTPServer(LISTEN_ADDR, MyHTTPHandler)
threading.Thread(target=server.serve_forever).start()
processB.register_callback(on_message)

这种方法的问题是任务没有被执行,回调根本没有收到结果。

我还尝试将do_POST方法更改为使用asyncio.run(self.send_response())而不是asyncio.create_task,使用这种方法,我的回调获取结果并设置结果,但协程仅挂在await my_future

我怎样才能完成这项任务?

要从事件循环线程外部与异步交互,请使用call_soon_threadsafe:

def on_message(message):
# my_loop must have been initialized from the main thread
my_loop.call_soon_threadsafe(
my_futures[message['uuid']].set_result, message['result'])

调用asyncio.run_coroutine_threadsafe而不是create_task

然而,这并不是代码的唯一问题。您从来没有运行事件循环,所以这些都没有机会执行。您需要在代码中的某个位置具有asyncio.run或等效代码,通常位于顶层。

由于HTTPServer不是为asyncio编写的,因此不能从do_POST调用asyncio函数。相反,考虑使用aiohttp

同上,但无需保存对事件循环的引用

future = my_futures[message['uuid']]
result = message['result']
future.get_loop().call_soon_threadsafe(future.set_result, result)

最新更新