你好,我有一个多处理程序,比如
#this is pseudocode-ish
def worker(queue, context):
set_context(context) #set global context within worker
while queue.qsize() > 0:
process(queue.get(False))
pool = multiprocessing.Pool(20, worker, (queue, global_context))
pool.close()
pool.join()
问题是global context
是一个非常重的物体,因此生成每个单独的过程(酸洗/脱洗(需要一段时间。所以我发现,对于较短的队列,整个队列由前几个生成的进程处理,然后程序的其余部分卡在生成其余进程时,这不可避免地什么都不做,因为队列中没有任何东西。例如,每个进程需要 1 秒生成,但队列在 2 秒内处理 - 因此前两个进程在 2-3 秒内完成队列,然后程序的其余部分需要 17 秒来生成其余队列。
有没有办法在队列为空时杀死其余进程?或者一种更灵活的方法来设置池进程数 - 例如,仅在需要时生成另一个进程?
谢谢
没有办法使用multiprocessing.Pool
即时生成该过程。如果需要此类行为,则需要自行修改它。
对于关机,一种方法是使用multiprocessing.Pool.terminate
方法。但它可能会等待所有worker
完成初始化。
您也可以在工作完成后直接杀死所有工人。我认为有一个_pool
字段包含您可以强制终止的所有工作Process
。请注意,这可能会导致一些奇怪的行为,因为它不打算在外部处理。您必须确保正确清理所有管理thread
,这可能很棘手。
您的设计选择非常不寻常。您正在复制call_queue
。事实上,Pool
应该自己处理通信,你不需要额外的queue
.如果所有 tak 都在task_list
并且需要由process_task
处理,您可以执行以下操作
#this is pseudocode-ish
def init(queue, context):
set_context(context) # set global context within worker
pool = multiprocessing.Pool(20, init, (global_context,))
res = pool.map(process_task, task_list)
pool.terminate()
pool.join()
这样就可以避免破坏Pool
设置,并且可能更有效率。
最后,如果您打算多次重复使用池并且global_context没有更改,则可以考虑使用loky
.(免责声明:我是这个项目的维护者之一(。 这允许您在程序中多次重用工作线程池,而无需重新设置所有内容。 一个问题是没有initializer
因为它遵循concurrent.futures
的 API,但可以使用multiprocessing.Barrier
并提交max_workers
初始值设定项作业来完成initializer
。这将确保initializer
的每个作业都由一个工作人员运行,并且所有工作人员都运行initializer
。