大型数据集,ProcessPoolExecutor问题



问题-ProcessPoolExecutor没有提高速度。tqdm确认

对python有足够的了解,可以复制和/或编写一个有效的程序。每个文件大约需要40秒才能加载->过滤器->写我有大约6800个文件要处理,我想要一个更好的版本,它使用了我所有的处理能力(6核),我试着写那个版本(下面)。所说的版本产生,但比我原来的功能稍慢:

from concurrent.futures import ProcessPoolExecutor
from glob import glob
from json import dump
from tqdm import tqdm
from pybufrkit.decoder import Decoder, generate_bufr_message
from pybufrkit.renderer import FlatJsonRenderer
decoder = Decoder()
DIRECTORY = 'C://blah/'
files = glob(DIRECTORY+'*.bufr')
PHI_MAX, PHI_MIN, LAMBDA_MAX, LAMBDA_MIN = x,x,x,x #Integers
def load_decode_filter(file):
'''`
Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
'''
output_message = []
with open(file, 'rb') as ins:
for bufr_message in generate_bufr_message(
decoder,ins.read()):
input_list = FlatJsonRenderer().render(bufr_message)[3][2] #necessary for [mask] to function
mask = [obj for obj in input_list if ((PHI_MAX > obj[
12] > PHI_MIN) & (LAMBDA_MAX > obj[13] > LAMBDA_MIN))]
output_message.extend(mask)
return output_message
def main(files_in):
'''
attempt to intiate all cores in loading and filter bufr files
'''
with ProcessPoolExecutor(max_workers=6) as executor:
with tqdm(range(len(files_in)), desc='files loaded',
position=0) as progress:
futures = []
for file in files_in:
future = executor.submit(load_decode_filter(file), file)
future.add_done_callback(lambda p: progress.update())
futures.append(future)
results = []
for future in futures:
result = future.result()
results.append(result)
with open(DIRECTORY+'bufrout.json', 'w', encoding='utf-8') as f_o:
dump(results, f_o)
if __name__ == '__main__':
main(files)

我希望至少能减少每个文件的处理时间。


更新,结束:
首先,我要感谢所有发表评论的人以及回答者(我太新了,无法投票)。似乎有意义地提高效率的唯一方法是从不首先解码,并从原位bufr数据中获取我想要的东西,这超出了我目前的能力(这是我第一次接触任何类型的代码)。


我计划(目前)尽我所能运行我的初始版本(f.bufr-in,f.bufr_.txt-out),我会在每次"运行";。一线希望是,我在做这件事时已经学到了足够的东西,我将能够制作一个程序,将所有文本输出组合到一个文件中。再次感谢。

Q:
"问题-ProcessPoolExecutor没有提高速度。由tqdm确认">

A:
不,
恕我直言,
ProcessPoolExecutor()-实例的效率不是你的主要问题,但
你主要的问题是选择性能/效率(几乎)反模式,Python,Windows O/S领域中的Python子进程越多,你将不得不等待大约75个小时来收集所有结果(如果处理管道确实达到了你期望的效果,我无法判断,但我猜它不会……原因如下)

怀疑#1:
最好避免75小时产生无意义的输出:

给定文档化的标准Py3concurrent.futures.Executor()-实例.submit()-方法的调用签名,您的代码不符合此规范。

作为调用方,main()没有传递对函数的引用,而是首先为6800个文件中的每一个执行完整的纯[SERIAL]METOP工作包处理(这会产生一些昂贵的收集的巨大消息列表),然后(与将引用传递给函数/就地lambda运算符的记录要求相反)再次花费了巨大的RAM/CPU/TIME费用,SER/sended/DES被传输到Executor管理的工作进程池中的一个(我怀疑在收到列表时,它是否能够做任何合理的事情,而不是函数(计划在这样的远程进程中执行,通过传递给它的参数,根据调用签名指定)。哎哟

def main( files_in ):
'''                                                                 __doc__
attempt to intiate all cores in loading and filter bufr files
'''
with ProcessPoolExecutor( max_workers = 6
)  as executor: #---------------------------# eXe CONTEXT-mgr

with tqdm( range( len( files_in ) ),
desc     = 'files loaded',
position = 0
) as progress: #---------------------------------------# pro CONTEXT-in-CONTEXT-mgr

futures = []
for file in files_in: #---------------------------------------#     LUXURY of top-level iterator, commanding  6800x times a pool of workers
future = executor.submit( load_decode_filter( file ), #---#     ??? WHY CALC THE RESULT BEFORE passing it to the .submit() ???
file    #---#         std PARA
)
future.add_done_callback( lambda p: progress.update() )   #     LUXURY of tdqm() for showing 75-hours of work ???
futures.append( future ) #--------------------------------#     LUXURY of no performance gain

results = []
for future in futures:
result = future.result()
results.append( result ) #--------------------------------#     LUXURY of adverse performance gain

with open( DIRECTORY + 'bufrout.json', 'w',
encoding = 'utf-8'
) as f_o: #------------------------------------------------# f_o CONTEXT-mgr
dump( results, f_o )

怀疑#2:
最好避免任何&所有降低性能的语法构造函数,
如果性能是要实现的真正目标:

避免任何和所有的罪恶键入一种低挂的水果SLOC-s,这似乎是";性感";,但已经由巨大的附加管理成本支付。

设计流程,以便我们可以通过延迟屏蔽(文件I/O是一种经典情况)和避免任何可减少的步骤(创建命名变量(有时从未使用)类似于sin)来提高端到端处理时间。

如果你在Windows O/S中运行,你的(隐藏的)子进程实例化成本是所有其他情况中最高的-Windows O/S将生成Python解释器进程的完整自上而下的副本,包括所有数据结构等,所以如果这导致你的物理RAM";"过于拥挤";,O/S将开始(在这75个小时的剩余时间里…)一场从RAM到磁盘的、由虚拟内存管理的文件I/O传输(延迟约10.000x)的恶战;从磁盘到RAM。这将有效地损坏任何其他CPU的RAM I/O操作,我们可能会直接忘记任何关于提高性能的梦想。

pybufrkit承诺中,如果您的";过滤器";可使用pybufrkit-模板编译:

"(…)BUFR模板编译
模板编译的主要目的是性能。然而,由于位操作是整个处理中最耗时的部分。性能增益在一定程度上是有限的。根据要为消息处理的描述符的总数,模板编译可以提高10-30%的性能。阅读文档">

一如既往,熵减少代码:

def load_decode_filter( file ):
'''`
Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
'''
output_message = []
with open( file, 'rb' ) as ins: #----------------------------------- # ins CONTEXT-mgr
for idx,         
bufr_message  
in             
enumerate( generate_bufr_message( decoder,                   #     LUXURY of enumerate for no real use
ins.read() # <-------------# ins.
)
):
input_list = FlatJsonRenderer().render( bufr_message )[3][2] #     LUXURY of JSON-(re)-)decorations
mask = [ obj for obj in input_list                           #
if ( (    PHI_MAX > obj[12] >    PHI_MIN )
& ( LAMBDA_MAX > obj[13] > LAMBDA_MIN )
)
]
output_message.extend( mask )
return output_message

性能提示,如果既没有使用pybufrkit的本机编译模板,也没有使用pybufrkit的本机脚本CLI任务,并求助于Win/Py3处理流:

  • 考虑到Python解释器主进程的完整上下副本的无论如何都要支付的成本,您的工人应该"知道";所有文件的列表,所以这个令人尴尬的独立文件处理程序将最好地:

  • gc.collect(); gc.disable()在产卵任何一池工人之前

  • 生成与CPU-RAM物理内存相同数量的max_workers工作进程主机硬件上存在I/O通道(任务是内存绑定的,而不是CPU)

  • 拆分,在main()侧要处理的文件列表-使用max_workers的多个、平衡长度、不重叠的( from_fileIDX, to_fileIDX )元组

  • executor.submit()一个块处理函数引用,具有( from_, to_ )的单个元组,并将所有其余部分安排在这样的块处理函数内,包括延迟屏蔽的文件I/O存储结果(可以稍后合并,使用O/S文本/二进制文件合并)

  • 更喜欢延迟屏蔽流,在教科书中使用语法糖(ed)迭代器可能很好,但在这里,这些是(不可屏蔽的)性能杀手-收集一个庞大的[ obj for obj in ... if ... ]列表从来都不是为了改善类似流(可屏蔽延迟)的过程流,而不首先收集这样一个巨大的列表,只是为了下一步(重新)迭代这样一个巨大的列表到文件I/O这样一个列表的项目一个接一个到磁盘文件。在一个单一的步骤流中更好地迭代/过滤/有条件地执行文件I/O操作(减少RAM,避免附加开销以及所有这些都具有可屏蔽延迟)

有关更多详细信息,您可能喜欢阅读本文以及本文中的代码和有针对性的示例。

相关内容

  • 没有找到相关文章

最新更新