为什么Dask不尊重LocalCluster的内存限制



我正在一台16GB RAM的机器上运行下面粘贴的代码(有意(。

import dask.array as da
import dask.delayed
from sklearn.datasets import make_blobs
import numpy as np
from dask_ml.cluster import KMeans
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1, processes=False,
memory_limit='2GB', scheduler_port=0,
silence_logs=False, dashboard_address=8787)
n_centers = 12
n_features = 4
X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)
centers = np.zeros((n_centers, n_features))
for i in range(n_centers):
centers[i] = X_small[y_small == i].mean(0)
print(centers)
n_samples_per_block = 450 * 650 * 900
n_blocks = 4
delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
centers=centers,
n_features=n_features,
random_state=i)[0]
for i in range(n_blocks)]
arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X_small.dtype)
for obj in delayeds]
X = da.concatenate(arrays)
print(X)
X = X.rechunk((1000, 4))
clf = KMeans(init_max_iter=3, oversampling_factor=10)
clf.fit(X)
client.close()

考虑到我正在创建4个具有2GB内存限制(总共8GB(的工作程序,我希望看到该算法不超过该机器的内存量。不幸的是,它使用了超过16 GB的数据并进行了交换。

如果我误解了Dask的概念,我真的不知道这段代码出了什么问题(特别是因为这段代码在数据依赖性方面没有任何复杂性(。

这不是dask不尊重内存约束问题的直接答案(简短的答案似乎是这不是绑定约束(,但是代码可以沿着以下方向改进:

  • 使用由dask_ml适配的make_blobs:这减少了由于构造dask阵列和相关整形而引起的开销
  • 使用上下文管理器来创建客户端(和集群(:这将更好地处理.close,尤其是在工作线程上执行的代码中有错误的情况下
from dask.distributed import Client
from dask_ml.cluster import KMeans
from dask_ml.datasets import make_blobs
client_params = dict(
n_workers=4,
threads_per_worker=1,
processes=False,
memory_limit="2GB",
scheduler_port=0,
silence_logs=False,
dashboard_address=8787,
)
n_centers = 12
n_features = 4
n_samples = 1000 * 100
chunks = (1000 * 50, 4)
X, _ = make_blobs(
n_samples=n_samples,
centers=n_centers,
n_features=n_features,
random_state=0,
chunks=chunks,
)
clf = KMeans(init_max_iter=3, oversampling_factor=10, n_clusters=n_centers)
with Client(**client_params) as client:
result = clf.fit(X)
print(result.cluster_centers_)

这是对@SultanOrazbayev答案的补充,该答案比原始片段快得多,并且在分配的内存内运行良好。

使用Dask Dashboard的";Woker的记忆";面板中,我还看到总进程内存超过2GB内存限制,并且出现以下警告:distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OSUnmanaged memory: 3.57 GiB -- Worker memory limit: 1.86 GiB,其中非托管内存随着计算的继续而增加。在这种情况下,通常建议手动修剪记忆,然而,正如Dask语篇上的一个类似问题所解释的,这在KMeans中是不可能的(这是一个悬而未决的问题(。希望这能增加一些有用的上下文。

最新更新