在Dask数组的列上应用函数



将函数应用于Dask数组的每个列的最有效方法是什么?如下所述,我已经尝试了许多方法,但我仍然怀疑我对Dask的使用相当业余。

我有一个很宽很长的数组,在3,000,000 x 10,000的范围内。我想对这个数组的每一列应用ecdf函数。单独的列结果堆叠在一起应该产生一个与输入数组具有相同维度的数组。

考虑以下测试,让我知道哪种方法是理想的,或者我可以如何改进。我知道,我可以只用最快的那个,但我真的想最大限度地利用Dask的可能性。数组也可以大几倍。与此同时,我的基准测试结果也让我感到惊讶。也许我没有正确理解Dask背后的逻辑。

import numpy as np
import dask
import dask.array as da
from dask.distributed import Client, LocalCluster
from statsmodels.distributions.empirical_distribution import ECDF
### functions
def ecdf(x):
fn = ECDF(x)
return fn(x)
def ecdf_array(X):
res = np.zeros_like(X)
for i in range(X.shape[1]):
res[:,i] = ecdf(X[:,i])

return res
### set up scheduler / workers
n_workers = 10
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=4)
client = Client(cluster)
### create data set 
X = da.random.random((100000,100)) #dask
Xarr = X.compute() #numpy
### traditional for loop
%timeit -r 10 foo1 = ecdf_array(Xarr)
### adjusting chunk size to 2d-array and map_blocks
X = X.rechunk(chunks=(X.shape[0],np.ceil(X.shape[1]/n_workers)))
Xm = X.map_blocks(lambda x: ecdf_array(x),meta = np.array((), dtype='float'))
%timeit -r 10 foo2 = Xm.compute()
### adjusting chunk size to column size and map_blocks
X = X.rechunk(chunks=(X.shape[0],1))
Xm = X.map_blocks(lambda x: np.expand_dims(ecdf(np.squeeze(x)),1),meta = np.array((), dtype='float'))
%timeit -r 10 foo3 = Xm.compute()
### map over columns by slicing
Xm = client.map(lambda i: ecdf(np.asarray(X[:,i])),range(X.shape[1]))
Xm = client.submit(lambda x: da.transpose(da.vstack(x)),Xm)
%timeit -r 10 foo4 = Xm.result()
### apply_along_axis
Xaa = da.apply_along_axis(lambda x: np.expand_dims(ecdf(x),1), 0, X, dtype=X.dtype, shape=X.shape)
%timeit -r 10 foo5 = Xaa.compute()
### lazy loop
Xl = []
for i in range(X.shape[1]):
Xl.append(dask.delayed(ecdf)(X[:,i]))

Xl = dask.delayed(da.vstack)(Xl)
%timeit -r 10 foo6 = Xl.compute()

在我的基准测试中,通过切片"是"将块大小调整为列大小"之后的最快方法吗?map_blocks"和非并行的&;apply_along_axis&;

MethodResults(10个循环)
传统for loop2.16 s±82.3 ms
调整块大小为2d-array &Map_blocks1.26 s±301 ms
调整块大小为列大小&Map_blocks926 ms±31.9
通过切片在列上映射316 ms±11.5 ms
apply_along_axis1.01±18.7毫秒
懒惰循环1.4±352 ms

据我所知,您的代码看起来是正确的(请参阅下面的解释,了解为什么map over columns by slicing的性能快得令人误解)。通过一些小的重构,"dask-y"版本可能是:

from dask.array.random import random
from numpy import zeros
from statsmodels.distributions.empirical_distribution import ECDF
n_rows = 100_000
X = random((n_rows, 100), chunks=(n_rows, 1))
_ECDF = lambda x: ECDF(x.squeeze())(x)
meta = zeros((n_rows, 1), dtype="float")
foo0 = X.map_blocks(_ECDF, meta=meta)
# executing foo0.compute() should take about 0.8s

请注意,dask数组是用适当的分块(每个分块一列)初始化的,而在当前代码中,执行时间将包括重新分块数组的时间。

就整体加速而言,单个计算很小(在50ms的规模上),因此为了减少任务数量,可以将多个列的多个处理分块到单个块中。但是,由于在numpy数组的列上进行迭代,这需要权衡与慢速相关的问题。其主要优点是减少了调度器的负担。根据最终数据集的规模和可用的计算资源,分块版本可能比非分块版本(即第一个片段)有轻微的优势:

from dask.array.random import random
from numpy import stack, zeros
from statsmodels.distributions.empirical_distribution import ECDF
n_rows = 100_000
n_cols = 100
chunk_size = (n_rows, 10)
X = random((n_rows, n_cols), chunks=chunk_size)
_ECDF = lambda x: ECDF(x.squeeze())(x)

def block_ECDF(x):
return stack([_ECDF(column) for column in x.T], axis=1)

meta = zeros(chunk_size, dtype="float")
foo0 = X.map_blocks(block_ECDF, meta=meta)
# executing foo0.compute() should take about 0.8s

注意,在你的基准测试中,最快的性能是map over columns by slicing。然而,这是一种误导,因为python在这里计时的只是计算结果的集合。大部分时间将花在计算上,所以计时这种方法的准确方法是在提交期货时启动计时器,并在收集结果时结束计时器。

最新更新