如何规范化大于内存的 dask 数组



我正在尝试通过自身减少来规范化 dask 数组(例如,b = a / a.sum() ab 是 dask 数组(。 此规范化数组的计算将首先计算了解原始数组所需的一切,然后才计算除法,因此如果内存不足,则溢出到磁盘。

代码示例:

from dask.distributed import Client
from dask import arry as da
# Create 1000 MB array full of 1's of with chunks of 50MB
a = da.ones(shape=(1/8 * 1000e6, 1), chunks=(1/8 * 50e6, 1))
# Create normalized array with sum = 1
b = a / a.sum()
# Create cluster to small for all of a or b at once
client = Client(n_workers=1, threads_per_worker=1, memory_limit=500e6)
# Compute sum of b  (Spills to disk)
print(b.sum().compute())

有没有类似以下内容的内容?

b = a / same_as_a_but_different_tasks.sum()

我通过复制数组并重命名顶层中的所有任务来解决此问题:

from copy import deepcopy

def copy_with_renamed_top_layer(a, prepend_name="copy-of-"):
    # copy array and dask
    b = a.copy()
    b.dask = deepcopy(b.dask)
    # get new name
    orig_name = a.name
    new_name = prepend_name + orig_name
    # rename dependencies
    b.dask.dependencies[new_name] = b.dask.dependencies.pop(orig_name)
    # rename tasks of uppermost layer
    b.dask.layers[new_name] = b.dask.layers.pop(orig_name)
    b.dask.layers[new_name] = {
        (new_name, ) + k[1:]: v
        for k, v in b.dask.layers[new_name].items()
    }
    # rename array
    b.name = new_name
    return b

# Create 1000 MB array full of 1's of with chunks of 50MB
a = da.ones(shape=(1/8 * 1000e6, 1), chunks=(1/8 * 50e6, 1))
# copy and rename uppermost layer
a_copy = copy_with_renamed_top_layer(a)
# Create normalized array with sum = 1
b = a / a_copy.sum()

但是,这是一个高度脆弱的解决方案,因为它依赖于当前的内部API。

最新更新