多处理池比手动实例化多个进程慢得多



我正在从一个大文件中读取一个块,将其作为行列表加载到内存中,然后处理每一行的任务。

顺序解决方案花费了太长时间,所以我开始研究如何将其并行化

我提出的第一个解决方案是使用Process并管理列表中每个子流程的部分。

import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()

def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data

with open(BIG_FILE_PATH, encoding="Latin-1") as file:
for piece in read_in_chunks(file, CHUNKSIZE):
jobs = []
piece_list = piece.splitlines()
piece_list_len = len(piece_list)
item_delta = round(piece_list_len/N_PROCESSES)
start = 0
for process in range(N_PROCESSES):
finish = start + item_delta
p = mp.Process(target=work, args=(piece_list[start:finish]))
start = finish
jobs.append(p)
p.start()
for job in jobs:
job.join()

它在大约2498ms内完成每个区块。

然后我发现了Pool工具来自动管理切片。

import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()

def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data

with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)

它在大约15540ms内完成每个块,比手动慢6倍,但仍然比顺序快。

我用错游泳池了吗?有更好或更快的方法吗?

感谢您的阅读。

更新

正如Hannu所建议的那样,游泳池的开销相当大。

Process方法调用的工作函数需要一个行列表。

Pool方法调用的工作函数需要一行,因为Pool是如何决定切片的。

我不太确定如何让游泳池一次给某个工人多排一行。

这应该能解决问题吗?

更新2

最后一个问题,还有第三种更好的方法吗?

我对此并不完全确定,但在我看来,您的程序在提交给工人的内容上存在实质性差异。

在您的Process方法中,您似乎提交了一大块行:

p = mp.Process(target=work, args=(piece_list[start:finish]))

但当你使用Pool时,你会这样做:

for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)

分块读取文件,但当您使用splitlines时,您的piece_list可迭代提交一个单位。

这意味着在流程方法中,您提交的子任务数量与CPU数量一样多,但在池方法中,提交的任务数量与源数据的行数量一样多。如果你有很多行,这将在你的池中产生巨大的编排开销,因为每个工作者一次只处理一行,然后完成,返回结果,然后池将另一行提交给新释放的工作者。

如果这就是这里正在发生的事情,这无疑解释了为什么Pool需要更长的时间才能完成。

如果你使用你的阅读器作为可迭代的,并跳过行分割部分会发生什么:

pool.map(work, read_in_chunks(file, CHUNKSIZE))

我不知道这是否可行,但你可以试试吗?

if __name__ == "__main__":
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)

我的理由是:
1。pool.map((,只需要一次,您的代码正在循环它
2.我猜循环会使它变慢
3.因为并行处理应该更快,呵呵

天哪!这是一次很难理解的旅程,但仍然很有趣。

Pool.map正在从迭代器中获取、pickle每个项,并将其分别传递给每个工作者。一旦工人完成,冲洗并重复,get->泡菜->通过。这会产生明显的管理费用。

这实际上是因为Pool.map不够智能,无法知道迭代器的长度,也无法有效地生成列表列表并将其中的每个列表(chunk(传递给工作者。

但是,这是可以帮助的。简单地将列表转换为具有列表理解的块列表(lists(就像一种魅力,并将开销降低到与Process方法相同的水平。

import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()

def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data

with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
piece_list_len = len(piece_list)
item_delta = round(piece_list_len / N_PROCESSES)
pool.map(work, [piece_list[i:i + item_delta] for i in range(0, piece_list_len, item_delta)])

这个带有列表迭代器列表的Pool与Process方法的运行时间完全相同。

最新更新