多处理队列,用于获取要处理的数据以进行处理



我有一个列表,其中包含需要提取的文件的文件名,我有一个提取这些文件的函数。而且由于它主要是使用CPU的任务,因此最好在多个进程之间生成它以利用多个CPU-s。

现在我的代码如下所示:

import multiprocessing
def unpack(files):
for f in files:
Archive(f).extractall('\path\to\destination\')

n_cpu = multiprocessing.cpu_count()
chunks = split(cabs_to_unpack, n_cpu) # just splits array into n equal chunks
for i in range(n_cpu):
p = Process(target=unpack, args=(chunks[i],))
p.start()
p.join()

但是要处理的文件在大小上有很大不同。有些文件是 1 kb,大多数是大约 300 kb,一些文件大约是 1.5Gb。

所以我的方法并不完美:5 个进程非常快速地处理它们的部分文件并退出,其他三个进程正在努力处理一些大文件和一堆小文件。因此,最好使快速过程不退出,但也处理这些小文件。

看起来在这里使用一些带有文件列表的队列会很好,它可以在多个进程中正常工作。我的解包函数看起来像这样:

def unpack(queue):
while queue.not_empty():
f = queue.get()
Archive(f).extractall('\path\to\destination\')

但是我在多处理模块中找不到此队列。唯一的多处理。队列不需要对象列表进行初始化,看起来它应该用作进程推送数据的容器,而不是从中获取数据的容器。

所以我的问题很简单,也许很愚蠢(我是多处理的新手),但是我应该使用哪个对象/类作为包含要处理数据的容器?

我推荐一个multiprocessing.Pool

from multiprocessing import Pool
def unpack(file_path):
Archive(file_path).extractall('\path\to\destination\')
pool = Pool()
pool.map(unpack, list_of_files)

它已经处理了块大小、工作进程的重用和进程处理逻辑。

相关内容

  • 没有找到相关文章

最新更新