我一直在编写一个脚本来对大数据集进行分类和过滤大型数据集,并将工作拆分在多个CPU内核上(通过使用多个过程(,但是,Python似乎在每个过程中启动每个过程串行运行它们而不是并行运行。
我已经删除了代码,以便它基本上没有做任何有用的事情(它会生成随机数的列表并仅删除所有内容(,并且问题仍然存在。这是Mac上的Python的问题吗?
我在OS X 10.13.6上运行Python 3.7.1。
这是完整的代码:
import math
import multiprocessing
import os
import random
import sys
import timeit
def delete_all(passed_nums):
print("Started process: {}, {}".format(multiprocessing.current_process(), os.getpid()))
while (len(passed_nums) > 0):
passed_nums.remove(passed_nums[0])
print("Finished process: {}, {}".format(multiprocessing.current_process(), os.getpid()))
return passed_nums
def chunksl(l, n):
i = [l[i:i + n] for i in range(0, len(l), n)]
return i
def main():
rnd_nums = random.sample(range(1, 1000000), 500000)
num_processes = 1
Pool = multiprocessing.Pool(num_processes)
list_chunk_size_per_core = int(math.ceil(len(rnd_nums)/float(num_processes)))
unsorted_sub_lists = list(chunksl(rnd_nums, list_chunk_size_per_core))
print("Number of CPUs: {}".format(num_processes))
print("Chunk size per CPU: {}".format(list_chunk_size_per_core))
print("Number of chunks: {}".format(len(unsorted_sub_lists)))
start_time = timeit.default_timer()
sorted_sub_lists = Pool.map(delete_all, unsorted_sub_lists, list_chunk_size_per_core)
end_time = timeit.default_timer()
print('Duration: {}'.format(end_time - start_time))
return True
if __name__ == '__main__':
sys.exit(main())
这是num_processes = 1
的输出:
Number of CPUs: 1
Chunk size per CPU: 500000
Number of chunks: 1
Started process: <ForkProcess(ForkPoolWorker-1, started daemon)>, 1617
Finished process: <ForkProcess(ForkPoolWorker-1, started daemon)>, 1617
Duration: 23.922029328999997
这是num_processes = 2
的输出:
Number of CPUs: 2
Chunk size per CPU: 250000
Number of chunks: 2
Started process: <ForkProcess(ForkPoolWorker-1, started daemon)>, 1630
Finished process: <ForkProcess(ForkPoolWorker-1, started daemon)>, 1630
Started process: <ForkProcess(ForkPoolWorker-1, started daemon)>, 1630
Finished process: <ForkProcess(ForkPoolWorker-1, started daemon)>, 1630
Duration: 11.938197925
最后,这是num_processes = 1
的输出
Number of CPUs: 1
Chunk size per CPU: 250000
Number of chunks: 1
Started process: <ForkProcess(ForkPoolWorker-1, started daemon)>, 1639
Finished process: <ForkProcess(ForkPoolWorker-1, started daemon)>, 1639
Duration: 5.904828338
可以看出,当num_processes = 2
脚本运行速度时,但不是因为它是并行运行的,而是因为删除两个250K项目列表中的所有条目比删除一个500K项目列表中的所有条目(从一个500K中删除所有条目(当num_processes = 2
是num_processes = 1
时上次运行持续时间的两倍,但列表大小减小为250K条目,这也是第一次运行时间的一个夸脱(。
我的理解是,在启动新过程时,使用Pool.map()
每个进程都会收到其列表 unsorted_sub_lists
块的完整副本,这意味着多个过程没有阻止尝试同时访问原始unsorted_sub_lists
列表。Python并未参考新过程。我可以在脚本末尾打印列表unsorted_sub_lists
,原始内容仍然存在,所以我认为我的理解是正确的吗?
在n
进程的情况下,变量unsorted_sub_lists
具有n
元素。因此,当您通过chunksize=list_chunk_size_per_core
的list_chunk_size_per_core
为250K时,您将长度2的列表列为最大长度250k块,从本质上重复了每个过程中的工作。尝试将unsorted_sub_lists
修复为500K,或者只需删除chunksize
CAN CALL中的CC_17参数