在Dask并行运行两次机器学习培训



我已经在Docker上用工人分布式实现了Dask。我用Docker启动了10个工作人员,组成如下文件:

docker-compose up -d --scale worker=10

要运行两个模型的机器学习训练,我要执行以下操作:

y1 = data1[label1]
X1 = data1[features1] 
y2 = data2[label2]
X2 = data2[features2] 
with joblib.parallel_backend('dask'):
try:
model1.fit(X1, y1)
model2.fit(X2, y2)
except Exception as e:
logging.error('There's an error ' + str(e))

现在,我想同时进行两次训练。我可以使用工人1到5进行培训1,使用工人6到10进行培训2。但是,如何告诉分布式Dask将一些工人用于一项任务,而将其他工人用于另一项任务?

这个问题比较高级,但我会提供一些可能有用的建议。

首先,您编写的代码在本地运行大部分内容。要并行执行ML训练,您需要:

  1. 在集群上工作(本地或远程(
  2. 将数据存储在Dask阵列或数据帧中
  3. 使用dask.delayed任务

  1. 使用client.submit()API

1.创建(本地(集群

从你的代码中,还不清楚你是否实例化了一个客户端,所以也许只需仔细检查一下你是否遵循了这里的dask-ml-docs指令:

from dask.distributed import Client
import joblib
client = Client(processes=False)        # create local cluster
# import coiled                         # or connect to remote cluster
# client = Client(coiled.Cluster())     
with joblib.parallel_backend('dask'):
# your scikit-learn code

但是,请注意,scitkit learn的Dask joblib后端对于扩展CPU绑定的工作负载非常有用。要扩展到RAM绑定的工作负载(大于内存数据集(,您需要考虑使用dask-ml并行估计器之一,如下所示。

2.在Dask阵列中存储数据

下面的最小代码示例将两个伪数据集设置为Dask数组,并实例化K-Means聚类算法。

import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt
# create dummy datasets
X, y = dask_ml.datasets.make_blobs(n_samples=10000000,
chunks=1000000,
random_state=0,
centers=3)
X2, y2 = dask_ml.datasets.make_blobs(n_samples=10000000,
chunks=1000000,
random_state=3,
centers=3)
# persist predictor sets to cluster memory
X = X.persist()
X2 = X2.persist()
# instantiate KM model
km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)

3.与Dask并行训练。延迟

下面的代码使用dask.delayedAPI并行运行训练。它遵循Dask文档中概述的最佳实践。

from dask import delayed
import dask
X = delayed(X)
X2 = delayed(X2)
@delayed
def train(model, X):
return model.fit(X)
# define task graphs (lazy evaluation, no computation triggered)
km1 = train(km, X)
km2 = train(km, X2)
# trigger computation and yield fitted models in parallel
km1, km2 = dask.compute(km1, km2)

4.与Futures和client.submit并行训练

或者,您可以使用client.submit()API进行并行训练。这会立即返回指向正在进行的计算的future,并最终返回存储的结果。点击此处阅读更多文档。

根据你的问题表述,我认为你在这里的首要任务是并行进行训练。这不需要手动将任务分配给特定的工人;Dask负责为您安排员工的时间安排和优化分配。如果你真的有兴趣手动将特定任务分配给特定的员工,我建议你看看这个SO的答案

最新更新