我有一个大的xarray。存储为zarr的数据集。我想对它执行一些自定义操作,这些操作不能通过使用Dask集群自动处理的类似numpy的函数来完成。因此,我将数据集划分为多个子集,并为每个子集向我的Dask集群提交形式的任务
def my_task(zarr_path, subset_index):
ds = xarray.open_zarr(zarr_path) # this returns an xarray.Dataset containing a dask.array
sel = ds.sel(partition_index)
sel = sel.load() # I want to get the data into memory
# then do my custom operations
...
然而,我注意到这造成了";任务中的任务":当工人收到";my_task";,它又向集群提交任务以加载数据集的相关部分。为了避免这种情况,并确保整个任务在工作人员中执行,我提交了任务:
def my_task_2(zarr_path, subset_index):
with dask.config.set(scheduler="threading"):
my_task(zarr_path, subset_index)
这是最好的方法吗?对于这种情况,最好的做法是什么?
通常使用apply_ufunc
或map_blocks
等方法在Xarray数据集中的块之间并行应用函数。