python执行多个命令进行多处理



我有一个使用多处理处理文件的脚本。这里有一个片段:

from multiprocessing import Pool
import os
cores=multiprocessing.cpu_count()
def f_process_file(file):
rename file  
convert file
add metadata
files=[f for f in os.listdir(source_path) if f.endswith('.tif')]
p =  multiprocessing.Pool(processes = cores)
async_result = p.map_async(f_process_file, files)
p.close()
p.join()

它运行得很好,只是在调用具有其他参数的f_process_file之前,我必须执行一些其他操作。以下是片段:

def f_process_file(file, inventory, variety):
if variety > 1:
rename file with follow-up number 
convert file
add metadata
else: 
rename file without follow-up number 
convert file
add metadata
# create list 
files=[f for f in os.listdir(source_path) if f.endswith('.tif')]
# create inventory list
inventories = [fn.split('_')[2].split('-')[0].split('.')[0] for fn in files]
# Check number of files per inventory 
counter=collections.Counter(inventories)
for file in files:
inventory = file.split('_')[2].split('-')[0].split('.')[0]
matching = [s for s in sorted(counter.items()) if inventory in s]
for key,variety in matching:  
f_process_file(file, inventory, variety)

我无法使用多处理来执行此操作。你有什么建议吗?

我发现了这个问题,并设法用apply_async解决了我的问题。以下是片段:

cores=multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=procs)
for file in files:
inventory = file.split('_')[2].split('-')[0].split('.')[0]
matching = [s for s in sorted(counter.items()) if inventory in s]
for key,variety in matching: 
pool.apply_async(f_process_file, (source, file, tmp, target, inventory, variety))
pool.close()
pool.join()

这里的问题是您的工作负载不适合multiprocessing.Pool。您正在进行嵌套迭代,因此,可能会以增量方式访问多个工作负载。有两种可能的方法来解决你的问题。第一种方法是先进行单线程计算,然后使用Pool。要做到这一点,首先构造一个对象,我将称之为ProcessingArgs:

def class ProcessingArgs:
def __init__(self, file, inventory, variety):
self.File = file
self.Inventory = inventory
self.Variety = variety

然后,您可以修改f_process_file以接受ProcessArgs,也可以添加一个包装器方法来分解类,然后调用f_process_file。不管怎样,你的for循环现在看起来是这样的:

needs_processing = []
for file in files:
inventory = file.split('_')[2].split('-')[0].split('.')[0]
matching = [s for s in sorted(counter.items()) if inventory in s]
needs_processing.extend( [ProcessingArgs(file, inventory, variety) for key, variety in matching] )
p = multiprocessing.Pool(processes = cores)
async_result = p.map_async(f_process_file, needs_processing)
p.close()
p.join()

另一种选择是使用异步库:

import asyncio
await asyncio.gather(f_process_file(p for p in needs_processing))

在这种情况下,您需要将async修饰符预先设置为def f_process_file,以便asyncio知道它是一个异步函数。

相关内容

  • 没有找到相关文章

最新更新