pathos 池:在 N 个任务后续订工作进程



我正在构建一个并行的python应用程序,它本质上是在外部库周围调用C包装器。需要并行性才能在所有 CPU 内核上同时运行计算。

我最终使用了pathos.multiprocessing.ProcessPool,但是这些池缺少标准multiprocessing.Pool类构造函数的maxtaskperchild参数(请参阅此处的参考(。我需要这个功能,因为 C 库依赖于进程时钟来定义一些执行时间限制,这些限制最终会在任务堆积时达到。

有没有办法要求ProcessPool经理在给定数量的任务后续订工作进程?

阐明我的意图的示例代码:

from pathos.pools import ProcessPool
from os import getpid
import collections
def print_pid(task_id):
pid = getpid()
return pid
if __name__ == "__main__":
NUM_TASKS = 50
MAX_PER_CHILD = 2

# limit each process to maximum MAX_PER_CHILD tasks
# we would like the pool to exit the process and spawn a new one
# when a task counter reaches the limit
# below argument 'maxtasksperchild' would work with standard 'multiprocessing'
pool = ProcessPool(ncpu=2, maxtasksperchild=MAX_PER_CHILD)
results = pool.map(print_pid, range(NUM_TASKS), chunksize=1)
tasks_per_pid = dict(collections.Counter(results))
print(tasks_per_pid)
# printed result
# {918: 8, 919: 6, 920: 6, 921: 6, 922: 6, 923: 6, 924: 6, 925: 6}
# observe that all processes did more than MAX_PER_CHILD tasks

我尝试了什么

  • ProcessPool构造函数中设置maxtasksperchild(参见上面的朴素示例(似乎没有任何作用
  • 在worker函数中调用sys.exit()会使程序挂起
  • 我在深入研究源代码时发现了提示

pathos.multiprocessing中有两个池:ProcessPool_ProcessPool. 前者旨在具有增强的池生命周期,以最小化启动时间,并具有持久性和重新启动功能 - 但是缺少一些"multiprocessing"关键字。后者(_ProcessPool(是API设计的下一级,并提供与multiprocessingPool接口相同的接口(但使用dill(。 所以,看看_ProcessPool.

相关内容

  • 没有找到相关文章

最新更新