我有超过 10,000 个C
文件,我需要将它们中的每一个传递给某个应用程序foo.exe
,以便为每个C
文件处理和生成反汇编文件,即在此过程结束时,我将有 10,000 个lst
/output
个文件!假设,这个过程不是IO-Bound
(尽管foo.exe
为每个c
文件将新的lst
文件写入磁盘。
我的任务是
实现并行python程序以在最短的时间内完成工作! 通过利用所有CPU内核来完成此任务。
我的方法
我已经实现了这个程序,它对我有用,下面列出的伪代码:
- 遍历所有
c
文件,并在全局List
中为每个文件推送abs path
,files_list
。 - 计算
cpu
逻辑内核数(使用psutil
py 模块(,这将是稍后要调度的最大线程数。 假设它是8个线程。 - 生成新的列表,
workers_list
(它是一个列表列表(,其中包含将files_list
除以8产生的间隔或索引(L_index、R_index(。 例如,如果我有 800 个 C 文件,那么workers_list
将如下所示:workers_list = [[0-99],[100,199],...,[700,799]]
. - 调度 8 个线程,
workers
,每个线程将在workers_list
中操作单个条目。 每个线程将打开进程(subprocess.call(...)
(并调用当前c
文件上的foo.exe
。
在下面发布相关代码:
相关守则
import multiprocessing
import subprocess
import psutil
import threading
import os
class LstGenerator(object):
def __init__(self):
self.elfdumpExePath = r"C:.....elfdump.exe" #abs path to the executable
self.output_dir = r"C:.....out" #abs path to where i want the lst files to be generated
self.files = [] # assuming that i have all the files in this list (abs path for each .C file)
def slice(self, files):
files_len = len(files)
j = psutil.cpu_count()
slice_step = files_len / j
workers_list = []
lhs = 0
rhs = slice_step
while j:
workers_list.append(files[lhs:rhs])
lhs += slice_step
rhs += slice_step
j -= 1
if j == 1: # last iteration
workers_list.append(files[lhs:files_len])
break
for each in workers_list: #for debug only
print len(each)
return workers_list
def disassemble(self, objectfiles):
for each_object in objectfiles:
cmd = "{elfdump} -T {object} -o {lst}".format(
elfdump=self.elfdumpExePath,
object=each_object,
lst=os.path.join(self.outputs, os.path.basename(each_object).rstrip('o') + 'lst'))
p = subprocess.call(cmd, shell=True)
def execute(self):
class FuncThread(threading.Thread):
def __init__(self, target, *args):
self._target = target
self._args = args
threading.Thread.__init__(self)
workers = []
for portion in self.slice(self.files):
workers.append(FuncThread(self.disassemble, portion))
# dispatch the workers
for worker in workers:
worker.start()
# wait or join the previous dispatched workers
for worker in workers:
worker.join()
if __name__ == '__main__':
lst_gen = LstGenerator()
lst_gen.execute()
我的问题
- 我能以更有效的方式做到这一点吗?
- Python 是否有标准的库或模块来完成工作并降低我的代码/逻辑复杂性? 也许
multiprocessing.Pool
?
在Windows上运行,使用Python 2.7!
谢谢
是的,multiprocessing.Pool
可以帮助解决这个问题。这也完成了对每个 CPU 的输入列表进行分片的工作。这是python代码(未经测试(,应该让你上路。
import multiprocessing
import os
import subprocess
def convert(objectfile):
elfdumpExePath = "C:.....elfdump.exe"
output_dir = "C:.....out"
cmd = "{elfdump} -T {obj} -o {lst}".format(
elfdump=elfdumpExePath,
obj=objectfile,
lst=os.path.join(output_dir, os.path.basename(objectfile).rstrip('o') + 'lst'))
return cmd
files = ["foo.c", "foo1.c", "foo2.c"]
p = multiprocessing.Pool()
outputs = p.map(convert, files)
请记住,您的工作线程函数(convert
上面(必须接受一个参数。因此,如果您需要传入输入路径和输出路径,则必须将其作为单个参数完成,并且文件名列表必须转换为对列表,其中每对都是输入和输出。
上面的答案是针对python 2.7的,但请记住,python2已经到了它的生命周期的尽头。在 python3 中,您可以在with
语句中使用multiprocessing.Pool
,以便它自行清理。
在挣扎了一段时间后发布了我问题的答案,并注意到我可以在 python2.x 中导入concurrent.futures
! 这种方法将代码复杂度 RO 降至最低,甚至缩短了执行时间。 与我最初的想法不同,这些进程比 CPU 更受 IO 约束! 然而,我获得的时间效率足以方便使用多进程运行程序。
concurrent.futures
concurrent.futures 模块为异步执行可调用对象提供了一个高级接口。 异步执行可以使用线程执行,使用 ThreadPoolExecutor 或单独的进程,使用 ProcessPoolExecutor。 两者都实现相同的接口,该接口由抽象定义 执行器类。
class concurrent.futures.Executor
一个抽象类,它提供 异步执行调用的方法。不应使用它 直接,但通过其具体的子类。
submit(fn, *args, **kwargs)
调度可调用的 fn,作为 fn(*args **kwargs( 执行,并且 返回一个 Future 对象,该对象表示可调用对象的执行。
如需进一步阅读,请提供如下内容: 具有并发的并行任务。
<小时 />import multiprocessing
import subprocess
import psutil
import threading
import os
import concurrent.futures
class LstGenerator(object):
def __init__(self):
self.elfdumpExePath = r"C:.....elfdump.exe" #abs path to the executable
self.output_dir = r"C:.....out" #abs path to where i want the lst files to be generated
self.files = [] # assuming that i have all the files in this list (abs path for each .C file)
def disassemble(self, objectfile):
cmd = "{elfdump} -T {object} -o {lst}".format(
elfdump=self.elfdumpExePath,
object=objectfile,
lst=os.path.join(self.outputs, os.path.basename(objectfile).rstrip('o') + 'lst'))
return subprocess.call(cmd, shell=True,stdout=subprocess.PIPE)
def execute(self):
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(self.disassemble(file)) for file in self.files]
if __name__ == '__main__':
lst_gen = LstGenerator()
lst_gen.execute()