使用外部命令多处理数千个文件



我想从Python启动一个外部命令,以获取约8000个文件。每个文件都与其他文件独立处理。唯一的约束是一旦处理了所有文件,继续执行。我有4个物理内核,每个内核都有2个逻辑内核(multiprocessing.cpu_count()返回8)。我的想法是使用四个平行独立过程的池,这些过程将在8个内核中的4个中运行。这样,我的机器应该在此期间可用。

这是我一直在做的:

import multiprocessing
import subprocess
import os
from multiprocessing.pool import ThreadPool

def process_files(input_dir, output_dir, option):
    pool = ThreadPool(multiprocessing.cpu_count()/2)
    for filename in os.listdir(input_dir):  # about 8000 files
        f_in = os.path.join(input_dir, filename)
        f_out = os.path.join(output_dir, filename)
        cmd = ['molconvert', option, f_in, '-o', f_out]
        pool.apply_async(subprocess.Popen, (cmd,))
    pool.close()
    pool.join()

def main():
    process_files('dir1', 'dir2', 'mol:H')
    do_some_stuff('dir2')
    process_files('dir2', 'dir3', 'mol:a')
    do_more_stuff('dir3')

顺序处理需要120秒的100个文件。上面概述的多处理版本(功能process_files)仅需20秒即可。但是,当我在整个8000个文件中运行process_files时,我的PC挂起并且一小时后不会取消冻结。

我的问题是:

1)我认为ThreadPool应该初始化一个进程池(确切地说,这里是multiprocessing.cpu_count()/2的过程)。但是,我的计算机挂在8000个文件上,但没有在100个文件上挂起,这表明可能没有考虑到池的大小。要么那个,要么我做错了什么。你能解释一下吗?

2)当每个人都必须启动外部命令时,这是在Python下启动独立流程的正确方法,并且以处理方式并非没有利用所有资源?

我认为您的基本问题是使用subprocess.Popen。该方法确实不是等待命令在返回之前完成。由于该函数立即返回(即使命令仍在运行),就可以就您的ThreadPool而言完成该功能,并且可以产生另一个功能...这意味着您最终会产生8000个左右的过程。

使用subprocess.check_call您可能会有更好的运气:

Run command with arguments.  Wait for command to complete.  If
the exit code was zero then return, otherwise raise
CalledProcessError.  The CalledProcessError object will have the
return code in the returncode attribute.

so:

def process_files(input_dir, output_dir, option):
    pool = ThreadPool(multiprocessing.cpu_count()/2)
    for filename in os.listdir(input_dir):  # about 8000 files
        f_in = os.path.join(input_dir, filename)
        f_out = os.path.join(output_dir, filename)
        cmd = ['molconvert', option, f_in, '-o', f_out]
        pool.apply_async(subprocess.check_call, (cmd,))
    pool.close()
    pool.join()

如果您真的不在乎退出代码,则可能需要subprocess.call,如果该过程中的非零退出代码不会引起例外。

如果您使用的是Python 3,我会考虑使用concurrent.futures.ThreadPoolExecutormap方法。

另外,您可以自己管理子过程列表。

以下示例定义了启动ffmpeg的函数,将视频文件转换为Theora/vorbis格式。它返回每个启动子过程的一个popen对象。

def startencoder(iname, oname, offs=None):
    args = ['ffmpeg']
    if offs is not None and offs > 0:
        args += ['-ss', str(offs)]
    args += ['-i', iname, '-c:v', 'libtheora', '-q:v', '6', '-c:a',
            'libvorbis', '-q:a', '3', '-sn', oname]
    with open(os.devnull, 'w') as bb:
        p = subprocess.Popen(args, stdout=bb, stderr=bb)
    return p

在主程序中,维持代表运行子过程的Popen对象的列表。

outbase = tempname()
ogvlist = []
procs = []
maxprocs = cpu_count()
for n, ifile in enumerate(argv):
    # Wait while the list of processes is full.
    while len(procs) == maxprocs:
        manageprocs(procs)
    # Add a new process
    ogvname = outbase + '-{:03d}.ogv'.format(n + 1)
    procs.append(startencoder(ifile, ogvname, offset))
    ogvlist.append(ogvname)
# All jobs have been submitted, wail for them to finish.
while len(procs) > 0:
    manageprocs(procs)

因此,只有在运行子处理少于核心时,才开始一个新过程。多次使用的代码分离为manageprocs函数。

def manageprocs(proclist):
    for pr in proclist:
        if pr.poll() is not None:
            proclist.remove(pr)
    sleep(0.5)

使用sleep的呼叫用于防止程序在循环中旋转。

相关内容

  • 没有找到相关文章

最新更新