多处理在任务完成后停止生成新进程



你好,我有一个多处理程序,比如

#this is pseudocode-ish
def worker(queue, context):
set_context(context) #set global context within worker
while queue.qsize() > 0:
process(queue.get(False))
pool = multiprocessing.Pool(20, worker, (queue, global_context))
pool.close()
pool.join()

问题是global context是一个非常重的物体,因此生成每个单独的过程(酸洗/脱洗(需要一段时间。所以我发现,对于较短的队列,整个队列由前几个生成的进程处理,然后程序的其余部分卡在生成其余进程时,这不可避免地什么都不做,因为队列中没有任何东西。例如,每个进程需要 1 秒生成,但队列在 2 秒内处理 - 因此前两个进程在 2-3 秒内完成队列,然后程序的其余部分需要 17 秒来生成其余队列。

有没有办法在队列为空时杀死其余进程?或者一种更灵活的方法来设置池进程数 - 例如,仅在需要时生成另一个进程?

谢谢

没有办法使用multiprocessing.Pool即时生成该过程。如果需要此类行为,则需要自行修改它。

对于关机,一种方法是使用multiprocessing.Pool.terminate方法。但它可能会等待所有worker完成初始化。

您也可以在工作完成后直接杀死所有工人。我认为有一个_pool字段包含您可以强制终止的所有工作Process。请注意,这可能会导致一些奇怪的行为,因为它不打算在外部处理。您必须确保正确清理所有管理thread,这可能很棘手。

您的设计选择非常不寻常。您正在复制call_queue。事实上,Pool应该自己处理通信,你不需要额外的queue.如果所有 tak 都在task_list并且需要由process_task处理,您可以执行以下操作

#this is pseudocode-ish
def init(queue, context):
set_context(context) # set global context within worker
pool = multiprocessing.Pool(20, init, (global_context,))
res = pool.map(process_task, task_list)
pool.terminate()
pool.join()

这样就可以避免破坏Pool设置,并且可能更有效率。

最后,如果您打算多次重复使用池并且global_context没有更改,则可以考虑使用loky.(免责声明:我是这个项目的维护者之一(。 这允许您在程序中多次重用工作线程池,而无需重新设置所有内容。 一个问题是没有initializer因为它遵循concurrent.futures的 API,但可以使用multiprocessing.Barrier并提交max_workers初始值设定项作业来完成initializer。这将确保initializer的每个作业都由一个工作人员运行,并且所有工作人员都运行initializer

相关内容

  • 没有找到相关文章

最新更新