用Xarray/dask替换并行化引导



我想在网格数据上替换n = 1000个自举。一个计算大约需要0.5秒。我可以访问带有48个内核的超级计算机独家节点。因为重新采样是彼此独立的,所以我天真地希望在全部或至少许多内核上分配工作量,并提高性能提高.8 * ncors。但是我不明白。

我仍然对Dask缺乏适当的理解。根据设置DASK工人数量的最佳实践,我使用:

from dask.distributed import Client
client = Client(processes=False, threads_per_worker=8, n_workers=6, memory_limit=‘32GB')

我还尝试了Slurmcluster,但我想我首先需要了解自己的工作,然后扩展。

我的MWE:

  1. 创建示例数据
  2. 写功能我要应用
  3. 写重新采样INITS函数
  4. 用bootstrap(= n(作为参数写引导函数:请参见下面的许多实现
  5. 执行自举
import dask
import numpy as np
import xarray as xr
from dask.distributed import Client
inits = np.arange(50)
lats = np.arange(96)
lons = np.arange(192)
data = np.random.rand(len(inits), len(lats), len(lons))
a = xr.DataArray(data,
                        coords=[inits, lats, lons],
                        dims=['init', 'lat', 'lon'])
data = np.random.rand(len(inits), len(lats), len(lons))
b = xr.DataArray(data,
                        coords=[inits, lats, lons],
                        dims=['init', 'lat', 'lon'])
def func(a,b, dim='init'):
    return (a-b).std(dim)
bootstrap=96
def resample(a):
    smp_init = np.random.choice(inits, len(inits))
    smp_a = a.sel(init=smp_init)
    smp_a['init'] = inits
    return smp_a

# serial function
def bootstrap_func(bootstrap=bootstrap):
    res = (func(resample(a),b) for _ in range(bootstrap))
    res = xr.concat(res,'bootstrap')
    # leave out quantile because not issue here yet
    #res_ci = res.quantile([.05,.95],'bootstrap')
    return res

@dask.delayed
def bootstrap_func_delayed_decorator(bootstrap=bootstrap):
    return bootstrap_func(bootstrap=bootstrap)

def bootstrap_func_delayed(bootstrap=bootstrap):
    res = (dask.delayed(func)(resample(a),b) for _ in range(bootstrap))
    res = xr.concat(dask.compute(*res),'bootstrap')
    #res_ci = res.quantile([.05,.95],'bootstrap')
    return res
for scheduler in ['synchronous','distributed','multiprocessing','processes','single-threaded','threads']:
    print('scheduler:',scheduler)
    def bootstrap_func_delayed_processes(bootstrap=bootstrap):
        res = (dask.delayed(func)(resample(a),b) for _ in range(bootstrap))
        res = xr.concat(dask.compute(*res, scheduler=scheduler),'bootstrap')
        res = res.quantile([.05,.95],'bootstrap')
        return res
    %time c = bootstrap_func_delayed_processes()

以下结果来自我的4个核心笔记本电脑。但是在超级计算机上,我也看不到加速,而是减少50%。

串行的结果:

%time c = bootstrap_func()
CPU times: user 814 ms, sys: 58.7 ms, total: 872 ms
Wall time: 862 ms

并行的结果:

%time c = bootstrap_func_delayed_decorator().compute()
CPU times: user 96.2 ms, sys: 50 ms, total: 146 ms
Wall time: 906 ms

从循环并行的结果:

scheduler: synchronous
CPU times: user 2.57 s, sys: 330 ms, total: 2.9 s
Wall time: 2.95 s
scheduler: distributed
CPU times: user 4.51 s, sys: 2.74 s, total: 7.25 s
Wall time: 8.86 s
scheduler: multiprocessing
CPU times: user 4.18 s, sys: 2.53 s, total: 6.71 s
Wall time: 7.95 s
scheduler: processes
CPU times: user 3.97 s, sys: 2.1 s, total: 6.07 s
Wall time: 7.39 s
scheduler: single-threaded
CPU times: user 2.26 s, sys: 275 ms, total: 2.54 s
Wall time: 2.47 s
scheduler: threads
CPU times: user 2.84 s, sys: 341 ms, total: 3.18 s
Wall time: 2.66 s

预期结果: - 加速(由.8 * ncores(

其他考虑因素: - 我还检查了是否应该块。太多的块。块状阵列需要更长的时间。

我的问题: - 关于DASK并行化我错了什么? - 客户设置是否没有用? - 我是否实现了dask。删除不够聪明? - 我的串行函数是否由于DASK已经并联执行?我认为不是。

我最终解决了这个问题。在发布此挑战时,我显然不了解它的几个方面:

  • 我在笔记本电脑上运行了带有两个物理内核的时间。这不允许在CPU结合的问题中进行太多的并行化。现在,我用48个逻辑CPU
  • 在节点上运行了此操作。
  • 我应该考虑到算法的哪些部分很容易平行,哪些部分不是。只有这样我才能相应地。

请参阅我的解决方案:https://gist.github.com/aaronspring/118abd7b9bf81e5555555555b1fced42eef427f

游戏改变者WRT。最初发布的代码:

  • 我不参与func(使用time(
  • 的尺寸(此处x(
  • 我仍然使用上述客户端:设置Dask工人数量的最佳实践
  • 我只尝试并行化迭代部分。分位数方法是在内存中完成的。

结论:它比预期的要简单。要点显示了dask.delayeddask.futures的实现,但在我的用例中甚至不需要。首先尝试了解并行性https://realpython.com/python-concurrency/并阅读dask文档https://dask.org/。

用多维索引

更快的解决方案

https://xskillscore.readthedocs.io/en/latest/api/xskillscore.core.core.resampling.resampleample.resample_ide_idx.html#xskillskillskillskillscore.core.core.resampling.resampling.resamplame.respample_idx_idx

相关内容

  • 没有找到相关文章

最新更新