我有一个使用多处理处理文件的脚本。这里有一个片段:
from multiprocessing import Pool
import os
cores=multiprocessing.cpu_count()
def f_process_file(file):
rename file
convert file
add metadata
files=[f for f in os.listdir(source_path) if f.endswith('.tif')]
p = multiprocessing.Pool(processes = cores)
async_result = p.map_async(f_process_file, files)
p.close()
p.join()
它运行得很好,只是在调用具有其他参数的f_process_file之前,我必须执行一些其他操作。以下是片段:
def f_process_file(file, inventory, variety):
if variety > 1:
rename file with follow-up number
convert file
add metadata
else:
rename file without follow-up number
convert file
add metadata
# create list
files=[f for f in os.listdir(source_path) if f.endswith('.tif')]
# create inventory list
inventories = [fn.split('_')[2].split('-')[0].split('.')[0] for fn in files]
# Check number of files per inventory
counter=collections.Counter(inventories)
for file in files:
inventory = file.split('_')[2].split('-')[0].split('.')[0]
matching = [s for s in sorted(counter.items()) if inventory in s]
for key,variety in matching:
f_process_file(file, inventory, variety)
我无法使用多处理来执行此操作。你有什么建议吗?
我发现了这个问题,并设法用apply_async解决了我的问题。以下是片段:
cores=multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=procs)
for file in files:
inventory = file.split('_')[2].split('-')[0].split('.')[0]
matching = [s for s in sorted(counter.items()) if inventory in s]
for key,variety in matching:
pool.apply_async(f_process_file, (source, file, tmp, target, inventory, variety))
pool.close()
pool.join()
这里的问题是您的工作负载不适合multiprocessing.Pool
。您正在进行嵌套迭代,因此,可能会以增量方式访问多个工作负载。有两种可能的方法来解决你的问题。第一种方法是先进行单线程计算,然后使用Pool
。要做到这一点,首先构造一个对象,我将称之为ProcessingArgs
:
def class ProcessingArgs:
def __init__(self, file, inventory, variety):
self.File = file
self.Inventory = inventory
self.Variety = variety
然后,您可以修改f_process_file
以接受ProcessArgs
,也可以添加一个包装器方法来分解类,然后调用f_process_file
。不管怎样,你的for循环现在看起来是这样的:
needs_processing = []
for file in files:
inventory = file.split('_')[2].split('-')[0].split('.')[0]
matching = [s for s in sorted(counter.items()) if inventory in s]
needs_processing.extend( [ProcessingArgs(file, inventory, variety) for key, variety in matching] )
p = multiprocessing.Pool(processes = cores)
async_result = p.map_async(f_process_file, needs_processing)
p.close()
p.join()
另一种选择是使用异步库:
import asyncio
await asyncio.gather(f_process_file(p for p in needs_processing))
在这种情况下,您需要将async
修饰符预先设置为def f_process_file
,以便asyncio
知道它是一个异步函数。