python multiprocessing Pool.apply() RuntimeError



我正在用一个示例示例尝试python多处理。我使用Pool.apply()来获得list_alist_b中的逐行常见项目。

import multiprocessing as mp
list_a = [[1, 2, 3], [5, 6, 7, 8], [10, 11, 12], [20, 21]]
list_b = [[2, 3, 4, 5], [6, 9, 10], [11, 12, 13, 14], [21, 24, 25]]
def get_commons(list_1, list_2):
return list(set(list_1).intersection(list_2))
pool = mp.Pool(mp.cpu_count())
results = [pool.apply(get_commons, args=(l1, l2)) for l1, l2 in zip(list_a, list_b)]
pool.close()
print(results)

会抛出以下错误>

An attempt has been made to start a new process before the current process has finished its bootstrapping phase.

在搜索类似问题时,我尝试使用__name__ == '__main__'如下-

if __name__ == '__main__':
results = [pool.apply(get_commons, args=(l1, l2)) for l1, l2 in zip(list_a, list_b)]

运气不佳

当您调用pool.apply时,您不是多处理,因为apply方法是一个阻塞方法,在结果准备好并返回之前不会返回。您可以调用apply_async,它立即返回一个代表未来结果的AsyncResult实例,然后您将得到这些结果的列表。然后可以循环遍历这个列表,在每个AsyncResult实例上调用get,等待结果准备好并返回。将是多处理的。或者您可以调用apply_async,指定callback参数,指定一个函数,该函数将在结果准备好时调用。或者,如下面的代码所示,您可以调用starmap:

import multiprocessing as mp
list_a = [[1, 2, 3], [5, 6, 7, 8], [10, 11, 12], [20, 21]]
list_b = [[2, 3, 4, 5], [6, 9, 10], [11, 12, 13, 14], [21, 24, 25]]
def get_commons(list_1, list_2):
return list(set(list_1).intersection(list_2))
if __name__ == '__main__':
pool = mp.Pool(mp.cpu_count())
results = pool.starmap(get_commons, zip(list_a, list_b))
print(results)

打印:

[[2, 3], [6], [11, 12], [21]]

但是,对于get_commons这样一个微不足道的工作函数,由于在一个地址空间和另一个地址空间之间创建进程和传递参数和结果的额外开销,您将看不到任何性能改进。

下面是使用pool.apply_async的代码:
import multiprocessing as mp
list_a = [[1, 2, 3], [5, 6, 7, 8], [10, 11, 12], [20, 21]]
list_b = [[2, 3, 4, 5], [6, 9, 10], [11, 12, 13, 14], [21, 24, 25]]
def get_commons(list_1, list_2):
return list(set(list_1).intersection(list_2))
if __name__ == '__main__':
pool = mp.Pool(mp.cpu_count())
async_results = [pool.apply_async(get_commons, args=(l1, l2)) for l1, l2 in zip(list_a, list_b)]
results = [async_result.get() for async_result in async_results]
print(results)
下面是使用回调的代码:
import multiprocessing as mp
list_a = [[1, 2, 3], [5, 6, 7, 8], [10, 11, 12], [20, 21]]
list_b = [[2, 3, 4, 5], [6, 9, 10], [11, 12, 13, 14], [21, 24, 25]]
def get_commons(list_1, list_2):
return list(set(list_1).intersection(list_2))
def my_callback(result):
results.append(result)
if __name__ == '__main__':
pool = mp.Pool(mp.cpu_count())
results = []
for l1, l2 in zip(list_a, list_b):
pool.apply_async(get_commons, args=(l1, l2), callback=my_callback)
# wait for all submitted tasks to complete:
pool.close()
pool.join()
print(results)

注意,当使用"回调"技巧,结果将按完成的顺序添加,不一定按任务提交的顺序添加。

最新更新