如何使用所有 CPU 对一大堆文件进行子处理



我需要在命令行中使用LaTeXML库将86,000个TEX文件转换为XML。我尝试编写一个 Python 脚本来使用subprocess模块自动执行此操作,利用所有 4 个内核。

def get_outpath(tex_path):
path_parts = pathlib.Path(tex_path).parts
arxiv_id = path_parts[2]
outpath = 'xml/' + arxiv_id + '.xml'
return outpath
def convert_to_xml(inpath):
outpath = get_outpath(inpath)
if os.path.isfile(outpath):
message = '{}: Already converted.'.format(inpath)
print(message)
return
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
stderr=subprocess.PIPE, 
stdout=subprocess.PIPE)
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
message = '{}: Converted!'.format(inpath)
print(message)
def start():
start_time = time.time()
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(),
maxtasksperchild=1)
print('Initialized {} threads'.format(multiprocessing.cpu_count()))
print('Beginning conversion...')
for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5): 
pass
pool.close()
pool.join()
print("TIME: {}".format(total_time))
start()

该脚本会导致Too many open files并降低我的计算机速度。从活动监视器来看,此脚本似乎正在尝试一次创建 86,000 个转换子进程,并且每个进程都在尝试打开一个文件。也许这是pool.imap_unordered(convert_to_xml, preprints)的结果——也许我不需要将 map 与subprocess.Popen结合使用,因为我有太多命令要调用?有什么替代方案?

我花了一整天的时间试图找出处理批量子处理的正确方法。我是 Python 这一部分的新手,所以任何朝着正确方向前进的提示将不胜感激。谢谢!

convert_to_xml中,process = subprocess.Popen(...)语句生成了一个latexml子进程。 如果没有像process.communicate()这样的阻塞调用,即使latexml继续在后台运行,convert_to_xml也会结束。

由于convert_to_xml结束,池会向关联的工作进程发送另一个任务来运行,因此再次调用convert_to_xml。 另一个latexml进程再次在后台生成。 很快,您在latexml进程中就达到了眼球,并且达到了打开文件数量的资源限制。

修复很简单:添加process.communicate()以告诉convert_to_xml等待latexml过程完成。

try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
stderr=subprocess.PIPE, 
stdout=subprocess.PIPE)
process.communicate()                                   
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)

关于if __name__ == '__main__'

正如Martineau指出的那样,多处理文档中有一个警告 生成新进程的代码不应在模块的顶层调用。 相反,代码应包含在if __name__ == '__main__'语句中。

在 Linux 中,如果您忽略此警告,就不会发生任何可怕的事情。 但在Windows中,代码"叉炸弹"。或者更准确地说,代码 导致生成未缓解的子进程链,因为在 Windows 上,fork是通过生成新的 Python 进程来模拟的,该进程随后导入调用脚本。每次导入都会生成一个新的 Python 进程。每个 Python 进程都尝试导入调用脚本。在消耗所有资源之前,循环不会中断。

因此,为了善待我们的Windows分叉兄弟,请使用

if __name__ == '__main__:
start()

有时进程需要大量内存。释放内存的唯一可靠方法是终止进程。maxtasksperchild=1告诉pool在完成 1 个任务后终止每个工作进程。然后,它会生成一个新的工作进程来处理另一个任务(如果有的话)。这将释放原始工作线程可能已分配的(内存)资源,否则无法释放这些资源。

在您的情况下,工作进程看起来不需要太多内存,因此您可能不需要maxtasksperchild=1。 在convert_to_xml中,process = subprocess.Popen(...)语句生成一个latexml子进程。 如果没有像process.communicate()这样的阻塞调用,即使latexml继续在后台运行,convert_to_xml也会结束。

由于convert_to_xml结束,池会向关联的工作进程发送另一个要运行的任务,因此再次调用convert_to_xml。 另一个latexml进程再次在后台生成。 很快,您在latexml进程中就达到了眼球,并且达到了打开文件数量的资源限制。

修复很简单:添加process.communicate()以告诉convert_to_xml等到latexml过程完成。

try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
stderr=subprocess.PIPE, 
stdout=subprocess.PIPE)
process.communicate()                                   
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)

chunksize会影响工作人员在将结果发送回主流程之前执行的任务数。 有时这可能会影响性能,尤其是在进程间通信是整个运行时的重要部分时。

在您的情况下,convert_to_xml需要相对较长的时间(假设我们等到latexml完成),它只是返回None。因此,进程间通信可能不是整个运行时的重要组成部分。因此,我不希望您在这种情况下发现性能发生重大变化(尽管实验永远不会有什么坏处!


在普通的 Python 中,map不应该只用于多次调用函数。

出于类似的风格原因,我会保留使用pool.*map*方法用于我关心返回值的情况。

所以而不是

for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5): 
pass

您可以考虑使用

for preprint in preprints:
pool.apply_async(convert_to_xml, args=(preprint, ))

相反。


传递给任何pool.*map*函数的可迭代对象将被消耗 立即。 可迭代对象是否是迭代器并不重要。没有 在此处使用迭代器具有特殊的内存优势。imap_unordered返回一个 迭代器,但它不会在任何特别友好的迭代器中处理其输入 道路。

无论您传递哪种类型的可迭代对象,在调用pool.*map*函数时,可迭代对象是 消耗并转换为放入任务队列的任务。

以下是证实这一说法的代码:

version1.py:

import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x

def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x

def start():
pool = mp.Pool()
for item in pool.imap_unordered(foo, gen()):
pass
pool.close()
pool.join()
if __name__ == '__main__':
start()

version2.py:

import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x

def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x

def start():
pool = mp.Pool()
for item in gen():
result = pool.apply_async(foo, args=(item, ))
pool.close()
pool.join()
if __name__ == '__main__':
start()

运行version1.pyversion2.py都会产生相同的结果。

Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here

至关重要的是,您会注意到Got here打印速度非常快 10 次 运行的开始,然后有一个长时间的暂停(而计算 完成),在程序结束之前。

如果发电机gen()以某种方式被pool.imap_unordered缓慢消耗, 我们也应该期待Got here印刷得很慢。由于Got here是 打印 10 次并且快速,我们可以看到可迭代gen()正在 在任务完成之前就完全消耗了。

运行这些程序应该希望能给你信心pool.imap_unorderedpool.apply_async正在将任务放入队列中 基本上以相同的方式:在拨打电话后立即。

最新更新