在单台机器上解析 DASK 代码的问题



与 dask 并行比顺序编码慢。

我有一个嵌套的 for 循环,我试图在本地集群上并行,但找不到正确的方法。

我想并行内部循环。

我有 2 个大的 numpy 矩阵,我正在尝试迭代它们并对矩阵的子集进行数学计算。 尺寸:

data_mat.shape = (38, 243863)
indicies_mat.shape (243863, 27)
idxX.shape = (19,)
idxY.shape = (19,)

seq_code:

start = datetime.datetime.now()
for i in range(num+1):
if i == 0:
labels = np.array(true_labels)
else:
labels = label_mat[i]
idxX = list(np.where(labels == 1))
idxY = list(np.where(labels == 2))
ansColumn = []
for j in range(indices.shape[0]):
list_of_indices = [[i] for i in indices_slice]
dataX = (data_mat[idxX, list_of_indices]).T
dataY = (data_mat[idxY, list_of_indices]).T
ansColumn.append(calc_func(dataX, dataY))
if i == 0:
ansMat = ansColumn
else:
ansMat = np.c_[ansMat, ansColumn]

end = datetime.datetime.now()
print(end - start)

并行代码:

start = datetime.datetime.now()
cluster = LocalCluster(n_workers=4, processes=False)
client = Client(cluster)
for i in range(num+1):
if i == 0:
labels = np.array(true_labels)
else:
labels = label_mat[i]
idxX = list(np.where(labels == 1))
idxY = list(np.where(labels == 2))
[big_future] = client.scatter([data_mat], broadcast=True)
[idx_b] = client.scatter([idxX], broadcast=True)
[idy_b] = client.scatter([idxY], broadcast=True)

futures = [client.submit(prep_calc_func, idx_b, idy_b, indices[j, :], big_future) for j in range(indices.shape[0])]
ansColumn = []
for fut in dask.distributed.client.as_completed(futures):
ansColumn.append(fut.result())
if i == 0:
ansMat = ansColumn
else:
ansMat = np.c_[ansMat, ansColumn]

end = datetime.datetime.now()
print(end - start)

辅助功能:

def = prep_calc_func(idxX, idxY, subset_of_indices, data_mat):
list_of_indices = [[i] for i in indices_slice]
dataX = (data_mat[idxX, subset_of_indices]).T
dataY = (data_mat[idxY, subset_of_indices]).T
ret_val = calc_func(dataX, dataY)
return ret_val

本地机器:MacBook Pro(视网膜,13 英寸,2014 年中) 处理器: 2.6 GHz 英特尔酷睿 i5

硬件物理CPU:2 hw.logicalcpu: 4

内存: 8 GB 1600 MHz DDR3

当我执行 seq 代码时,需要 01:52 分钟才能完成(不到 2 分钟)

但是当我尝试并行代码时,它需要超过 15 分钟的时间。 (无论我使用哪种方法:计算,结果和客户端提交或dask延迟)

(我更喜欢使用 dask 分布式包,因为下一阶段也可能使用远程集群。

知道我做错了什么吗?

有些事情会变慢的原因有很多。 可能会有很多沟通。 你的任务可能太小(回想一下,Dask 的开销是每个任务大约 1 毫秒),或者完全是其他原因。 有关了解 Dask 性能的更多信息,我推荐以下文档:

  • https://docs.dask.org/en/latest/delayed-best-practices.html
  • https://docs.dask.org/en/latest/understanding-performance.html

最新更新