用例:
10 台- 服务器处理 20 亿种参数组合(16 核 128GB RAM)
- 每个服务器使用 pool.apply_async() 处理 2 亿个组合(Python版本 3.7)
- 尽可能缩短总处理时间
问题:
- Python 会耗尽所有内存并抛出错误">运行时错误:无法启动新线程"和">OSError:[Errno 12] 无法分配内存">
我正在考虑将.apply_async()方法替换为.apply(),但我想通过将非阻塞模式更改为阻塞模式,这将对总处理时间产生严重影响。
任何人都可以帮助找到这种情况的最佳解决方案(消耗最少的时间)吗?
我的代码:
exec_log = multiprocessing.Manager().list([0, ''])
lock = multiprocessing.Manager().Lock()
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)
# Parameters a to j
for a in a_list: # a_list contains 2 elements
for b in b_list: # b_list contains 2 elements
for c in c_list: # c_list contains 5 elements
for d in d_list: # d_list contains 10 elements
for e in e_list: # e_list contains 10 elements
for f in f_list: # f_list contains 5 elements
for g in g_list: # g_list contains 20 elements
for h in h_list: # h_list contains 10 elements
for i in i_list: # i_list contains 10 elements
for j in j_list: # j_list contains 10 elements
pool.apply_async(prestart, (df, start_date, end_date, curr_date,
analysis_period, a, b, c, d, e,
f, g, h, i, j, exec_log, lock))
pool.close()
pool.join()
logger.info(exec_log[1])
问:任何人都可以帮助找到这种情况的最佳解决方案(消耗最少的时间)吗?
当然,让我们检查一下购物清单的可见部分:
1)
避免Lock()
处理,鉴于任何形式的Lock()
-ing仍然存在,您希望对代码执行进行并行组织的愿望会一个接一个地重新编排成阻塞状态,重新编排纯[SERIAL]
(让所有其余的人等待轮到他们 - 无论一个人在工作中放置多少个大型RAM-on-steriods服务器 - 都在等待轮到他们, 大部分时间都在"抢"Lock()
)
2)避免任何形式的共享,鉴于任何形式的共享资源仍然存在,您希望拥有并行工作流(再次)
诉诸于等待任何此类共享资源从任何其他人的使用中释放出来,并可能开始被此过程使用。
3)
避免任何过多的进程/内存实例化(除了您已经经历过的RAM天花板崩溃之外,对于HPC级并行问题解决方案来说,这些也非常昂贵 - 两者都在:
[TIME]
域...实例化附加开销... ~ 数千个[ns]
和[SPACE]
域...新的内存分配需求由 O/S 以不断增长的规模处理,附加成本很高,远高于 ~ 大量数千[ns]
,最糟糕的是陷入虚拟内存交换窒息......同样,为移动数据存储器块(RAM 内副本的成本约为 ~ 300-350 [ns] 用于 NUMA-CPU 内核非本地 RAM 目标 + 基于数据卷的 I/O 带宽驱动和可用 RAM 通道可用性支付相关的[TIME]
域成本进一步限制了此类数据传输延迟。在陷入交换的情况下,操作系统编排的(即在您的控制范围之外)交换流的成本为 ~ 1,000 x ~ 10,000 x 差 ...并通过在这段时间内通过具有最高优先级来阻止任何其他计算和读/写 RAM 的尝试,因此没有人希望在计算过程中发生这些尝试的另一个原因)
使用multiprocessing.get_all_start_methods()
将显示本地主机操作系统可以提供的所有选项,以消除(小规模缓解)不需要的过度RAM分配。
使用len( os.sched_getaffinity(0) )
控制而不是上面使用的multiprocessing.cpu_count()
,将消除本地主机过度订阅免费使用的CPU内核的数量(也具有较少的"只是"[CONCURRENT]
-RAM副本的副本,这些副本必须等待其调度程序排序的轮次,然后它们的RAM/CPU执行轮到...),如果O/S亲和映射策略限制用户程序使用所有平台指示的硬件内核。
基于"糟糕"for
的"外部"迭代器可能总是得到改进,但核心策略更重要:
可以将代码重构为分布式系统计算:
一种干净且以性能为中心的分布式系统计算方法可以平衡实际成本与花费此类成本后将享受的净性能收益。
许多"语法"方法无法扩展到远远超过一些主要的增长规模 - 首先,通过经历从 In-CACHE 计算中驱逐到感受到 RAM 的实际成本(这在教科书示例和演示中没有观察到),接下来是 In-RAM 数据流的成本,对于越来越大的数据大小,最后但并非最不重要的一点是,来自多处理的天真期望的成本(其中, 根据 O/S 和版本,可能会引入分配许多完整 python 会话副本的生成附加成本,这可能会使内存错误崩溃 - 就像上面发布的情况一样)
给定 10 台服务器每个服务器有 16 个内核和 128 GB RAM,概念上有希望的举动将在使用数据计算prestart()
时测试 python-process 的大小,然后生成服务器内部的"worker"而不是直到它们都适合 RAM(以避免交换),接下来创建一个消息传递/信令元层来协调这个分布式工作线程池中这么多作业的智能参数传递, 使用任何高性能、低延迟的工具,如 ZeroMQ 或 nanomsg,并设计工作流程,以便您永远不会传递一条数据两次(因为参数传递的附加成本比 O(n) 更糟糕,由于系统和 O/S 属性),因此永远不要在性能驱动的系统中移动一段数据两次。
遵循这几条简单的规则并不便宜,但没有更快的方法(它们免费来得越少......