如何并行化 python 脚本来处理 10,000 个文件?



我有超过 10,000 个C文件,我需要将它们中的每一个传递给某个应用程序foo.exe,以便为每个C文件处理和生成反汇编文件,即在此过程结束时,我将有 10,000 个lst/output个文件!假设,这个过程不是IO-Bound(尽管foo.exe为每个c文件将新的lst文件写入磁盘。

我的任务是

实现并行python程序以在最短的时间内完成工作! 通过利用所有CPU内核来完成此任务。

我的方法

我已经实现了这个程序,它对我有用,下面列出的伪代码:

  1. 遍历所有c文件,并在全局List中为每个文件推送abs pathfiles_list
  2. 计算cpu逻辑内核数(使用psutilpy 模块(,这将是稍后要调度的最大线程数。 假设它是8个线程。
  3. 生成新的列表,workers_list(它是一个列表列表(,其中包含将files_list除以8产生的间隔或索引(L_index、R_index(。 例如,如果我有 800 个 C 文件,那么workers_list将如下所示:workers_list = [[0-99],[100,199],...,[700,799]].
  4. 调度 8 个线程,workers,每个线程将在workers_list中操作单个条目。 每个线程将打开进程(subprocess.call(...)(并调用当前c文件上的foo.exe

在下面发布相关代码:

相关守则

import multiprocessing
import subprocess
import psutil
import threading
import os
class LstGenerator(object):
def __init__(self):
self.elfdumpExePath = r"C:.....elfdump.exe" #abs path to the executable 
self.output_dir = r"C:.....out"             #abs path to where i want the lst files to be generated
self.files = [] # assuming that i have all the files in this list (abs path for each .C file)

def slice(self, files):
files_len = len(files)
j = psutil.cpu_count()
slice_step = files_len / j
workers_list = []
lhs = 0
rhs = slice_step
while j:
workers_list.append(files[lhs:rhs])
lhs += slice_step
rhs += slice_step
j -= 1
if j == 1:  # last iteration
workers_list.append(files[lhs:files_len])
break
for each in workers_list:  #for debug only
print len(each)
return workers_list

def disassemble(self, objectfiles):
for each_object in objectfiles:
cmd = "{elfdump} -T {object} -o {lst}".format(
elfdump=self.elfdumpExePath,
object=each_object,
lst=os.path.join(self.outputs, os.path.basename(each_object).rstrip('o') + 'lst'))
p = subprocess.call(cmd, shell=True)

def execute(self):
class FuncThread(threading.Thread):
def __init__(self, target, *args):
self._target = target
self._args = args
threading.Thread.__init__(self)

workers = []
for portion in self.slice(self.files):
workers.append(FuncThread(self.disassemble, portion))
# dispatch the workers
for worker in workers:
worker.start()

# wait or join the previous dispatched workers
for worker in workers:
worker.join()


if __name__ == '__main__':
lst_gen = LstGenerator()
lst_gen.execute()

我的问题

  1. 我能以更有效的方式做到这一点吗?
  2. Python 是否有标准的库或模块来完成工作并降低我的代码/逻辑复杂性? 也许multiprocessing.Pool

在Windows上运行,使用Python 2.7!

谢谢

是的,multiprocessing.Pool可以帮助解决这个问题。这也完成了对每个 CPU 的输入列表进行分片的工作。这是python代码(未经测试(,应该让你上路。

import multiprocessing
import os
import subprocess
def convert(objectfile):
elfdumpExePath = "C:.....elfdump.exe"
output_dir = "C:.....out"
cmd = "{elfdump} -T {obj} -o {lst}".format(
elfdump=elfdumpExePath,
obj=objectfile,
lst=os.path.join(output_dir, os.path.basename(objectfile).rstrip('o') + 'lst'))
return cmd
files = ["foo.c", "foo1.c", "foo2.c"]
p = multiprocessing.Pool()
outputs = p.map(convert, files)

请记住,您的工作线程函数(convert上面(必须接受一个参数。因此,如果您需要传入输入路径和输出路径,则必须将其作为单个参数完成,并且文件名列表必须转换为对列表,其中每对都是输入和输出。

上面的答案是针对python 2.7的,但请记住,python2已经到了它的生命周期的尽头。在 python3 中,您可以在with语句中使用multiprocessing.Pool,以便它自行清理。

在挣扎了一段时间后发布了我问题的答案,并注意到我可以在 python2.x 中导入concurrent.futures! 这种方法将代码复杂度 RO 降至最低,甚至缩短了执行时间。 与我最初的想法不同,这些进程比 CPU 更受 IO 约束! 然而,我获得的时间效率足以方便使用多进程运行程序。

<小时 />

concurrent.futures

concurrent.futures 模块为异步执行可调用对象提供了一个高级接口。 异步执行可以使用线程执行,使用 ThreadPoolExecutor 或单独的进程,使用 ProcessPoolExecutor。 两者都实现相同的接口,该接口由抽象定义 执行器类。

class concurrent.futures.Executor
一个抽象类,它提供 异步执行调用的方法。不应使用它 直接,但通过其具体的子类。

submit(fn, *args, **kwargs)

调度可调用的 fn,作为 fn(*args **kwargs( 执行,并且 返回一个 Future 对象,该对象表示可调用对象的执行。

如需进一步阅读,请提供如下内容: 具有并发的并行任务。

<小时 />
import multiprocessing
import subprocess
import psutil
import threading
import os
import concurrent.futures
class LstGenerator(object):
def __init__(self):
self.elfdumpExePath = r"C:.....elfdump.exe" #abs path to the executable 
self.output_dir = r"C:.....out"             #abs path to where i want the lst files to be generated
self.files = [] # assuming that i have all the files in this list (abs path for each .C file)

def disassemble(self, objectfile):
cmd = "{elfdump} -T {object} -o {lst}".format(
elfdump=self.elfdumpExePath,
object=objectfile,
lst=os.path.join(self.outputs, os.path.basename(objectfile).rstrip('o') + 'lst'))
return subprocess.call(cmd, shell=True,stdout=subprocess.PIPE) 

def execute(self):
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(self.disassemble(file)) for file in self.files]


if __name__ == '__main__':
lst_gen = LstGenerator()
lst_gen.execute()

相关内容

  • 没有找到相关文章

最新更新