python 2.7多处理.另一个池中的池.apply_async



python 2.7多处理。另一个pool.apply_async 内的pool.apply_async

我有两个模块A和B。模块"A"声明一个大小为100的池,并使用pool.apply_async,调用模块"B"中的函数"BX"。模块"B"中的函数"BX"创建另一个大小为n的池,并使用其pool.apply_async调用另一个函数。

现在面临的这个问题是,执行过程在模块"B"中声明池时停止/退出。即在声明模块"B"中的第二个池时,该池位于模块"A"的第一个池内。

在pool.apply_async中执行pool.applie_async有什么解决方案吗?

您必须执行以下操作。请注意,我使用自己的multiprocessing分支(称为pathos),因为它提供了更好的序列化,使您能够从解释器运行,并且在创建和维护池方面更高效。然而,工作流程应该与此处所示大致相同:

>>> import pathos as p
>>> tp1 = p.pools.ThreadPool(100)
>>> tp2 = p.pools.ThreadPool(50)
>>> import itertools as it
>>>
>>> res = tp1.amap(tp2.amap, it.repeat(lambda x:x**2), [range(10)]*5)
>>> ans = res.get()
>>> [v.get() for v in ans]
[[0, 1, 4, 9, 16, 25, 36, 49, 64, 81], [0, 1, 4, 9, 16, 25, 36, 49, 64, 81], [0, 1, 4, 9, 16, 25, 36, 49, 64, 81], [0, 1, 4, 9, 16, 25, 36, 49, 64, 81], [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]]

注意,pathos中的amapmultiprocessing中的map_async。如果你只想要apply_async,那就是pathos中的apipe。然后关闭:

>>> tp1.close()
>>> tp2.close()
>>> tp1.join() 
>>> tp2.join()
>>> # pathos has an additional shutdown step to clear the pool cache
>>> tp1.clear()
>>> tp2.clear()

因此,对于apply_async的等价物,它看起来是这样的:

>>> tp1 = p.pools.ThreadPool(100)
>>> tp2 = p.pools.ThreadPool(50)
>>> tp1.apipe(tp2.apipe, lambda x:x**2, 10).get().get()
100
>>> tp1.close()
>>> tp2.close()
>>> tp1.join()
>>> tp2.join()
>>> tp1.clear()
>>> tp2.clear()

最新更新