Python-将多处理与异步相结合只在某些时候有效



我想将异步和多处理结合起来,因为我有一个任务,其中一部分是io绑定的,另一部分是cpu绑定的。我第一次尝试使用loop.run_in_executor((,但可能无法使用它。相反,我创建了两个进程,一个使用异步,另一个不使用。

代码是这样的,我有一个带有一些非阻塞函数和一个阻塞的类。我有一个异步。队列以在非阻塞部分和多处理之间传递信息。队列以在非阻塞函数和阻塞函数之间传递信息。

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import asyncio
import time

class TestClass:
def __init__(self):
m = mp.Manager()
self.blocking_queue = m.Queue()
async def run(self):
loop = asyncio.get_event_loop()
self.non_blocking_queue = asyncio.Queue() # asyncio Queue must be declared within event loop
task1 = loop.create_task(self.non_blocking1())
task2 = loop.create_task(self.non_blocking2())
task3 = loop.create_task(self.print_msgs())
await asyncio.gather(task1, task2)
task3.cancel()
def blocking(self):
i = 0
while i < 5:
time.sleep(0.6)
i += 1
print("Blocking ", i)
line = self.blocking_queue.get()
print("Blocking: ", line)
print("blocking done")
async def non_blocking1(self):
for i in range(5):
await self.non_blocking_queue.put("Hello")
await asyncio.sleep(0.4)
async def non_blocking2(self):
for i in range(5):
await self.non_blocking_queue.put("World")
await asyncio.sleep(0.5)
async def print_msgs(self):
while True:
line = await self.non_blocking_queue.get()
self.blocking_queue.put(line)
print(line)

test_class = TestClass()
with ProcessPoolExecutor() as pool:
pool.submit(test_class.blocking)
pool.submit(asyncio.run(test_class.run()))
print("done")

大约有一半的时间我运行这个,它运行良好,并打印出阻塞和非阻塞队列中的文本。另一半只打印出非阻塞队列的结果。看起来阻塞过程根本没有启动。这不是每隔一次都会发生的。它可能会连续工作五次,然后连续不工作五次。

是什么原因导致了这样的问题?同时使用多处理和异步,哪种更好的方法可以做到这一点?

运行异步任务"内部";另一个过程对我有效,例如:

def runfn(fn):
return asyncio.run(fn())
with ProcessPoolExecutor() as pool:
pool.submit(test_class.blocking)
pool.submit(runfn, test_class.run)

据推测,asyncio/任务内部存在某种状态,需要保持一致,或者在另一个进程中运行时出现故障

相关内容

最新更新