dask计算延迟函数的dict



我想并行化这段代码:

-        "mean": float(zonal_extract.mean().compute()),
-        "min": float(zonal_extract.min().compute()),
-        "max": float(zonal_extract.max().compute()),
-        "sum": float(zonal_extract.sum().compute()),
-        "stddev": float(zonal_extract.std().compute()),
-        "var": float(zonal_extract.var().compute()),

这是我第一次尝试在python中并行化某些东西,而不是一遍又一遍地调用同一个函数。这将是相同的数据,不同的功能。

尝试1

from dask import compute, delayed

results = delayed({})
results["mean"] = zonal_extract.mean
results["min"] = zonal_extract.min
results["max"] = zonal_extract.max
results["sum"] = zonal_extract.sum
results["stddev"] = zonal_extract.std
results["var"] = zonal_extract.var
results = compute(results, num_workers=4)  # , scheduler='processes'
results = {k: float(v) for k, v in results.items()}

尝试2

mean, min, max, sum, stddev, var = compute(
zonal_extract.mean(),
zonal_extract.min(),
zonal_extract.max(),
zonal_extract.sum(),
zonal_extract.std(),
zonal_extract.var(),
num_workers=4,
)  # , scheduler='processes'
results = {k: float(v) for k, v in dict(mean, min, max, sum, stddev, var).items()}

这似乎是一项简单的任务,但我找不到任何有效的方法。也许是因为我已经在多处理上下文和嵌套线程中了(这可能是不存在的,但听起来很酷(,或者是出现了错误:

L = Parallel(n_jobs=-1)(
File "/usr/local/lib/python3.9/dist-packages/joblib/parallel.py", line 1056, in __call__
self.retrieve()
File "/usr/local/lib/python3.9/dist-packages/joblib/parallel.py", line 935, in retrieve
self._output.extend(job.get(timeout=self.timeout))
File "/usr/local/lib/python3.9/dist-packages/joblib/_parallel_backends.py", line 542, in wrap_future_result
return future.result(timeout=timeout)
File "/usr/lib/python3.9/concurrent/futures/_base.py", line 445, in result
return self.__get_result()
File "/usr/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
raise self._exception
TypeError: Delayed objects of unspecified length are not iterable
real    0m25.048s
user    0m46.943s

编辑:

哦,这是因为延迟函数正在覆盖joblib的

from dask import compute, delayed
from joblib import Parallel, delayed

@creanion的答案很好,但我也要指出,不应该将mean()var()stddev()等操作包装在dask.delayed对象中:这些操作已经是惰性操作,所以可以直接对它们调用dask.compute()

因此,一个没有delayed包装器的最小示例是:

import dask
import dask.array as da
# Generate some fake data
zonal_extract = da.random.uniform(size=(100,), chunks=10)
summary_stats = {
"mean": zonal_extract.mean(),
"std": zonal_extract.std(),
"var": zonal_extract.var(),
"min": zonal_extract.min(),
"max": zonal_extract.max(),
}
# traverse=True is default, but being explicit
summary_stats_computed, = dask.compute(summary_stats, traverse=True)

它产生(用我的随机数卷(:

{'mean': 0.4903848677019127,
'std': 0.30733105780457826,
'var': 0.09445237909128101,
'min': 0.000996718178509548,
'max': 0.9981326789252434}

dask.compute将为您递归为dict。

你可以这样写:

results = dict(
mean=dask.delayed(zonal_extract.mean)(),
min=dask.delayed(zonal_extract.min)()
# and more
)
results = dask.compute(results)[0]

其基本思想是,可以将延迟计算嵌套到元组、列表、dict等中,然后传递给dask.compute。这里所需要的只是从函数调用中生成完全延迟的对象。

我们可以在不重复自己方面更"高效":


computations = {k: dask.delayed(getattr(zonal_extract, k))()
for k in "mean min max sum std var".split()}
results = dask.compute(computations)[0]

如果我退一步看,我想这种并行化的水平似乎太低了——这些都是算术运算不那么激烈的聚合,它们都遍历相同的数据来完成。var只是std的平方,从这个意义上说,要加快速度就更难了。

主要问题是我导入了两个具有相同函数名的东西。更改此

from dask import compute, delayed
from joblib import Parallel, delayed

到这个

import dask
from joblib import Parallel, delayed

然后第二次尝试代码开始工作

mean, min, max, sum, stddev, var = dask.compute(
zonal_extract.mean(),
zonal_extract.min(),
zonal_extract.max(),
zonal_extract.sum(),
zonal_extract.std(),
zonal_extract.var(),
num_workers=3,
)
results = {
k: float(v)
for k, v in dict(
mean=mean, min=min, max=max, sum=sum, stddev=stddev, var=var
).items()
}

但是,如果有人有办法真正使用带有dask的dicts,而不说出三次名字,我会很乐意接受的答案

最新更新