包装在xarray数据集中的dask数组子集上的并行任务



我有一个大的xarray。存储为zarr的数据集。我想对它执行一些自定义操作,这些操作不能通过使用Dask集群自动处理的类似numpy的函数来完成。因此,我将数据集划分为多个子集,并为每个子集向我的Dask集群提交形式的任务

def my_task(zarr_path, subset_index):
ds = xarray.open_zarr(zarr_path)  # this returns an xarray.Dataset containing a dask.array
sel = ds.sel(partition_index)
sel  = sel.load()  # I want to get the data into memory
# then do my custom operations
...

然而,我注意到这造成了";任务中的任务":当工人收到";my_task";,它又向集群提交任务以加载数据集的相关部分。为了避免这种情况,并确保整个任务在工作人员中执行,我提交了任务:

def my_task_2(zarr_path, subset_index):
with dask.config.set(scheduler="threading"):
my_task(zarr_path, subset_index)

这是最好的方法吗?对于这种情况,最好的做法是什么?

通常使用apply_ufuncmap_blocks等方法在Xarray数据集中的块之间并行应用函数。

最新更新