Python多线程-如何控制并行子进程的数量



这里是一个多处理的角落,我希望我的问题问得足够清楚。

我使用Python多处理来并行化一个函数,该函数将来自多个源的数据附加到文本文件中。执行顺序(以及获得锁(在这种情况下并不重要(据我所知(,因为子流程可以完全独立地工作。

for municipality in municipalityDictionary.keys():
proc = mp.Process(
target= f_unction,
args = (arg1, arg2, arg3)
)
procs.append(proc)  
for proc in procs:
roc.start()
for proc in procs:
roc.join()

但是,这段代码创建了许多等于len(procs(的子流程。我正在寻找一种控制并行运行的进程数量的方法(例如,有一个multiprocessing.cpu_count((+/-n个处理过的进程,我将向其提供所有进程(。

此外,所有并行函数都需要不同的时间来执行,因此在多处理.cpu_count((的批处理中应用proc.join((将导致必须等待所有进程中最慢的进程完成,然后才能继续执行主进程(将下一批处理获取到.start((和.join(.

我已经阅读了Python多处理文档,并浏览了多个实现示例(包括Manager((、Queue((&Pool(((,但我看到的例子是以不同的方式上下文化的,我不知道如何将它们应用于我的用例(可能是我(。

谢谢!

实际上,您的问题并不完全清楚,因此我包含了几个代码变体,试图涵盖您可能想要的内容。

检查您的代码会让人相信您正在重复地向每个进程传递相同的arg1arg2arg3值,而实际情况可能并非如此。如果是这样的话,那么下面的代码将实现你想要的:

from multiprocessing import Pool
from functools import partial
def main():
worker = partial(f_function, arg1, arg2, arg3)
# pool size is equal to multiprocessing.cpu_count():
pool = Pool() 
for municipality in municipalityDictionary:
pool.apply_async(worker)
# wait for all tasks to complete:
pool.close()
pool.join()
# required by windows
if __name__ == '__main__':
main()

我使用了pool.close()pool.join()来等待所有未完成的任务的完成。这之所以成为可能,是因为在查看代码时,您并没有从f_unction返回任何需要在主进程中处理的有意义的值。但是,您将无法检测到f_unction引发的任何异常。或者,您可以保存apply_async返回的AsyncResult实例,并在它们上调用方法get以阻止,直到任务完成。如果任务完成时出现异常,则此调用可能导致异常:

results = [pool.apply_async(worker) for municipality in municipalityDictionary]
# wait for all tasks to complete:
for result in results:
result.get()

如果要设置不同的池大小,请指定pool = Pool(pool_size)

如果对于municipalityDictionary中的每个键都要使用不同的值arg1arg2arg3,则应使用以下代码。但是,由于从您的代码中不清楚这些值来自哪里,也不清楚它们与municipalityDictionary的密钥有什么关系,出于演示目的,我假设有一个名为args的列表,如下所示:

args = [[arg1_0, arg2_0, arg3_0], [arg1_1, arg2_1, arg3_1], ... [arg1_N, arg2_N, arg3_N]]

由于似乎municipality的密钥,即municipality', are even being explicitly passed tof_function, it seems reasonable that it should be the length ofargs’,控制需要提交的任务数量。所以我们有:

from multiprocessing import Pool
args = [[arg1_0, arg2_0, arg3_0], [arg1_1, arg2_1, arg3_1], ... [arg1_N, arg2_N, arg3_N]]

def main():
# pool size is equal to multiprocessing.cpu_count():
pool = Pool() 
pool.starmap(f_unction, args)
# required by windows
if __name__ == '__main__':
main()

然而,如果municipalityDictionary的每个键都是一个元组,您希望对其进行解包并将其传递给f_unction,那么我们有:

from multiprocessing import Pool
def main():
# pool size is equal to multiprocessing.cpu_count():
pool = Pool()
pool.starmap(f_unction, municipalityDictionary.keys())
# required by windows
if __name__ == '__main__':
main()

最新更新