>我有一个应用程序需要基于 websocket 流执行一些处理器密集型工作。我想通过多处理并行化 CPU 密集型位,但我仍然需要异步接口来处理应用程序的流部分。为了解决这个问题,我希望制作一个等待的多处理版本。异步结果(multiprocessing.pool.Pool.submit_async操作的结果(。但是,我遇到了一些奇怪的行为。
我的新可等待池结果(这是 asyncio 的子类。未来(只要在我开始等待之前返回池结果即可正常工作。但是,如果我尝试在池结果返回之前等待它,那么程序似乎会在 await 语句上无限期地停止。
我已经用下一个(未来。async((( 并且迭代器在池处理完成之前返回未来实例本身,并在之后引发 StopIterationError,正如我所期望的那样。
代码如下。
import multiprocessing
import multiprocessing.pool
import asyncio
import time
class Awaitable_Multiprocessing_Pool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
def apply_awaitably(self, func, args = list(), kwargs = dict()):
return Awaitable_Multiprocessing_Pool_Result(
self,
func,
args,
kwargs)
class Awaitable_Multiprocessing_Pool_Result(asyncio.Future):
def __init__(self, pool, func, args = list(), kwargs = dict()):
asyncio.Future.__init__(self)
self.pool_result = pool.apply_async(
func,
args,
kwargs,
self.set_result,
self.set_exception)
def result(self):
return self.pool_result.get()
def done(self):
return self.pool_result.ready()
def dummy_processing_fun():
import time
print('start processing')
time.sleep(4)
print('finished processing')
return 'result'
if __name__ == '__main__':
async def main():
ah = Async_Handler(1)
pool = Awaitable_Multiprocessing_Pool(2)
while True:
future = pool.apply_awaitably(dummy_processing_fun, [])
# print(next(future.__await__())) # would show same as print(future)
# print(await future) # would stall indefinitely because pool result isn't in
time.sleep(10) # NOTE: you may have to make this longer to account for pool startup time on the first iteration
# print(next(future.__await__())) # would raise StopIteration
print(await future) # prints 'result'
asyncio.run(main())
我在这里错过了一些明显的东西吗?我认为我拥有可等待正常工作的所有基本要素,部分原因是在某些情况下我可以成功等待。有人有见解吗?
我不知道你为什么让它变得如此复杂......下面的代码怎么样?
from concurrent.futures import ProcessPoolExecutor
import asyncio
import time
def dummy_processing_fun():
import time
print('start processing')
time.sleep(4)
print('finished processing')
return 'result'
if __name__ == '__main__':
async def main():
pool = ProcessPoolExecutor(2)
while True:
future = pool.submit(dummy_processing_fun)
future = asyncio.wrap_future(future)
# print(next(future.__await__())) # would show same as print(future)
# print(await future) # would stall indefinitely because pool result isn't in
# time.sleep(5)
# print(next(future.__await__())) # would raise StopIteration
print(await future) # prints 'result'
asyncio.run(main())