Dask延迟与xarray - compute()结果仍然延迟



我尝试用Dask和xarray对两个数据集进行一些分析(例如avg),然后计算两个结果之间的差异。

这是我的代码

cluster = LocalCluster(n_workers=5, threads_per_worker=3, **worker_kwargs)
def calc_avg(path):

mean = xr.open_mfdataset( path,combine='nested', concat_dim="time", parallel=True, decode_times=False, decode_cf=False)['var'].sel(lat=slice(south,north), lon=slice(west,east)).mean(dim='time')
return mean
def diff_(x,y):
return x-y
p1 = "/path/to/first/multi-file/dataset"
p2 = "/path/to/second/multi-file/dataset"
a = dask.delayed(calc_avg)(p1)  
b = dask.delayed(calc_avg)(p2)
total = dask.delayed(diff_)(a,b)
result = total.compute()

这里的执行时间是17秒。

然而,绘制结果(result.plot())需要1分钟以上的时间,因此似乎在尝试绘制结果时实际发生了计算。

这是正确的方式使用任务延迟吗?

您在delayed函数中包装了对xr.open_mfdataset的调用,这本身就是一个任务操作。所以当你调用result.compute时,你在执行calc_avgmean函数。但是,calc_avg返回一个由磁盘支持的DataArray。因此,是的,17s任务将calc_avgmean的调度delayed任务图转换为open_mfdataset和数组操作的调度dask.array任务图。

要解决这个问题,请删除延迟的包装器并简单地使用dask.arrayxarray工作流:

a = calc_avg(p1)  # this is already a dask array because
# calc_avg calls open_mfdataset
b = calc_avg(p2)  # so is this
total = a - b     # dask understands array math, so this "just works"
result = total.compute()    # execute the scheduled job

请参阅xarray指南中使用dask进行并行计算的介绍。

最新更新