我尝试使用ProcessPoolExecutor
结合阻塞任务和非阻滞(I/O绑定(任务,并发现其行为非常出乎意料。
class BlockingQueueListener(BaseBlockingListener):
def run(self):
# Continioulsy listening a queue
blocking_listen()
class NonBlockingListener(BaseNonBlocking):
def non_blocking_listen(self):
while True:
await self.get_message()
def run(blocking):
blocking.run()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor()
blocking = BlockingQueueListener()
non_blocking = NonBlockingListener()
future = loop.run_in_executor(executor, run(blocking))
loop.run_until_complete(
asyncio.gather(
non_blocking.main(),
future
)
)
我期望这两个任务都会同时控制,但是阻止任务始于ProcessPoolExecutor
块,永远不会返回控制。怎么会发生?在多处理遗嘱执行者中开始使用普通的统治和期货的正确方法?
此行:
future = loop.run_in_executor(executor, run(blocking))
实际上将运行阻止功能并将其结果提供给执行者。
根据文档,您需要明确传递函数,然后是其参数。
future = loop.run_in_executor(executor, run, blocking)