如何使用 StdLib 和 Python 3 在一定范围内并行化迭代?



我已经在寻找答案好几天了,但无济于事。我可能只是不理解那里漂浮的部分,multiprocessing模块上的 Python 文档相当大,我不清楚。

假设您有以下 for 循环:

import timeit

numbers = []
start = timeit.default_timer()
for num in range(100000000):
numbers.append(num)
end = timeit.default_timer()
print('TIME: {} seconds'.format(end - start))
print('SUM:', sum(numbers))

输出:

TIME: 23.965870224497916 seconds
SUM: 4999999950000000

对于此示例,假设您有一个 4 核处理器。有没有办法总共创建 4 个进程,其中每个进程在单独的 CPU 内核上运行,完成速度大约快 4 倍,因此 24s/4 进程 = ~6 秒?

以某种方式将 for 循环分成 4 个相等的块,然后将 4 个块添加到数字列表中以等于相同的总和?有这个堆栈溢出线程:并行简单 For 循环,但我不明白。谢谢大家。

是的,这是可行的。您的计算不依赖于中间结果,因此您可以轻松地将任务划分为块并将其分布在多个进程中。这就是所谓的

令人

尴尬的平行问题

这里唯一棘手的部分可能是,首先将范围划分为相当相等的部分。直接拿出我的个人库两个函数来处理这个问题:

# mp_utils.py
from itertools import accumulate
def calc_batch_sizes(n_tasks: int, n_workers: int) -> list:
"""Divide `n_tasks` optimally between n_workers to get batch_sizes.
Guarantees batch sizes won't differ for more than 1.
Example:
# >>>calc_batch_sizes(23, 4)
# Out: [6, 6, 6, 5]
In case you're going to use numpy anyway, use np.array_split:
[len(a) for a in np.array_split(np.arange(23), 4)]
# Out: [6, 6, 6, 5]
"""
x = int(n_tasks / n_workers)
y = n_tasks % n_workers
batch_sizes = [x + (y > 0)] * y + [x] * (n_workers - y)
return batch_sizes

def build_batch_ranges(batch_sizes: list) -> list:
"""Build batch_ranges from list of batch_sizes.
Example:
# batch_sizes [6, 6, 6, 5]
# >>>build_batch_ranges(batch_sizes)
# Out: [range(0, 6), range(6, 12), range(12, 18), range(18, 23)]
"""
upper_bounds = [*accumulate(batch_sizes)]
lower_bounds = [0] + upper_bounds[:-1]
batch_ranges = [range(l, u) for l, u in zip(lower_bounds, upper_bounds)]
return batch_ranges

然后你的主脚本将如下所示:

import time
from multiprocessing import Pool
from mp_utils import calc_batch_sizes, build_batch_ranges

def target_foo(batch_range):
return sum(batch_range)  # ~ 6x faster than target_foo1

def target_foo1(batch_range):
numbers = []
for num in batch_range:
numbers.append(num)
return sum(numbers)

if __name__ == '__main__':
N = 100000000
N_CORES = 4
batch_sizes = calc_batch_sizes(N, n_workers=N_CORES)
batch_ranges = build_batch_ranges(batch_sizes)
start = time.perf_counter()
with Pool(N_CORES) as pool:
result = pool.map(target_foo, batch_ranges)
r_sum = sum(result)
print(r_sum)
print(f'elapsed: {time.perf_counter() - start:.2f} s')

请注意,我还在范围对象上将 for 循环切换为简单求和,因为它提供了更好的性能。如果您无法在实际应用程序中执行此操作,则列表理解仍然比手动填充列表快 ~60%,如您的示例所示。

示例输出:

4999999950000000
elapsed: 0.51 s
Process finished with exit code 0
import timeit
from multiprocessing import Pool
def appendNumber(x):
return x
start = timeit.default_timer()
with Pool(4) as p:
numbers = p.map(appendNumber, range(100000000))
end = timeit.default_timer()
print('TIME: {} seconds'.format(end - start))
print('SUM:', sum(numbers))

所以Pool.map就像内置的map函数。它需要一个函数和一个可迭代对象,并生成在可迭代对象的每个元素上调用该函数的结果列表。在这里,由于我们实际上不想更改可迭代范围内的元素,因此我们只返回参数。

关键是Pool.map将提供的可迭代对象(此处range(1000000000)(划分为块,并将它们发送到它拥有的进程数(此处定义为 4 inPool(4)(,然后将结果重新连接回一个列表。

运行它时我得到的输出是

TIME: 8.748245699999984 seconds
SUM: 4999999950000000

我做了一个比较,拆分任务所需的时间有时可能需要更长的时间:

文件multiprocessing_summation.py

def summation(lst):
sum = 0
for x in range(lst[0], lst[1]):
sum += x
return sum

文件multiprocessing_summation_master.py

%%file ./examples/multiprocessing_summation_master.py
import multiprocessing as mp
import timeit
import os
import sys
import multiprocessing_summation as mps
if __name__ == "__main__":
if len(sys.argv) == 1:
print(f'{sys.argv[0]} <number1 ...>')
sys.exit(1)
else:
args = [int(x) for x in sys.argv[1:]]
nBegin = 1
nCore = os.cpu_count()
for nEnd in args:
### Approach 1  ####
####################
start = timeit.default_timer()
answer1 = mps.summation((nBegin, nEnd+1))
end = timeit.default_timer()
print(f'Answer1 = {answer1}')
print(f'Time taken = {end - start}')
### Approach 2 ####
####################
start = timeit.default_timer()
lst = []
for x in range(nBegin, nEnd, int((nEnd-nBegin+1)/nCore)):
lst.append(x)
lst.append(nEnd+1)
lst2 = []
for x in range(1, len(lst)):
lst2.append((lst[x-1], lst[x]))
with mp.Pool(processes=nCore) as pool:
answer2 = pool.map(mps.summation, lst2)
end = timeit.default_timer()
print(f'Answer2 = {sum(answer2)}')
print(f'Time taken = {end - start}')

运行第二个脚本:

python multiprocessing_summation_master.py 1000 100000 10000000 1000000000

输出为:

Answer1 = 500500
Time taken = 4.558405389566795e-05
Answer2 = 500500
Time taken = 0.15728066685459452
Answer1 = 5000050000
Time taken = 0.005781152051264199
Answer2 = 5000050000
Time taken = 0.14532123447452705
Answer1 = 50000005000000
Time taken = 0.4903863230334036
Answer2 = 50000005000000
Time taken = 0.49744346392131533
Answer1 = 500000000500000000
Time taken = 50.825169837068
Answer2 = 500000000500000000
Time taken = 26.603663061636567

相关内容

  • 没有找到相关文章

最新更新