我只是在学习Dask,并了解它在令人尴尬的并行任务中的应用。我有一个函数,它从单个文件中读取数据,并对该数据执行长时间运行的计算。我通过使用joblib进行并行处理来加快计算速度。
我现在想用Dask将其扩展到多台分布式机器。我想请求一定数量的节点,让每台机器/节点处理一个文件池中的一个文件并返回结果。我希望每个文件的处理都能利用某种本地节点并行性。
如果这是MPI+OpenMP,我会在每台机器上有一个列,每个列中有作为OpenMP线程的物理核心的数量。有了Dask,我只知道如何创建一个庞大的工作人员库,共享每个文件或所有文件的处理。我想要复合并行性(每个节点一个文件,每个节点ncore进程帮助处理每个文件)
我试着用dask调用一个joblib函数,但它并没有使用每台机器上的所有核心。我也不知道如何通过客户端将提交的任务固定到某台机器上。
import joblib
import itertools
import numpy as np
#world's stupidest function as a simple illustrative example, matrix is read in from a file and this function is called on many different pairs in conjunction with the matrix to create a very expensive computation. parallelization over the list of pairs is trivial
def example(matrix, pair):
for i in range(100000): #takes almost no time
#for i in range(10000000): #takes a long time
x=np.exp(100)
return pair[0]+pair[1]+pair[2]
def my_parallel_example(matrix, pairs, num_jobs):
results= joblib.Parallel(n_jobs=num_jobs, verbose=10)(joblib.delayed(example)(matrix, pair) for pair in pairs)
return results
from dask_jobqueue import SGECluster
cores_per_node=24
cluster = SGECluster(
cores=1,
dashboard_address=':0',
job_extra=['-pe {} {}'.format(parallel_environment, cores_per_node), '-j y', '-o /dev/null'],
local_directory='$TMPDIR',
memory=100 GiB,
processes=cores_per_node,
project=project_name,
walltime='00:30:00'
)
#just requesting one 24-core machine
requested_cores=24
cluster.scale(requested_nodes)
client.wait_for_workers(requested_nodes)
matrix=None
possibilities=[1, 2, 3]
pairs = list(itertools.product(possibilities, possibilities, possibilities))
num_jobs=10
c=client.submit(my_parallel_bfs, matrix, pairs, num_jobs)
最终我想要这样的东西,但我搞不清语法:
matrices=[mat1, mat2, ...] #each matrix read from a seperate file and added to a pool of 'big' jobs to be tackled by a node
results=[]
for matrix in matrices:
c=client.submit(my_parallel_bfs, matrix, pairs, num_jobs) #each job is submitted to a node that then uses several cores/processes to compute the result corresponding to each file.
results.append(c.result() )
我在网上看到的一切似乎都在使用分布式dask进行令人尴尬的并行作业,而且我没有看到我的应用程序在每个节点嵌套一个大型计算并在每个大型计算上使用ncore子流程的例子。
我希望上面的代码能像我在没有dask的情况下直接运行它时那样有效地使用10个核心,但通过client.submit()提交它似乎只使用一个核心。我不知道如何扩展到多台机器,使每台机器在没有矩阵池中一个矩阵通信的情况下工作。
因此,在与此斗争了很长时间之后,在网上发现了许多关于混合joblib/Dask的问题(例如。https://github.com/joblib/joblib/issues/875),我想出了一个足够好的变通方法,可以发布部分答案。
理想情况下,我想用dask将独立的"父"任务提交给单独的客户端(机器),并让每个任务调用一个与joblib并行的计算密集型函数(每个核心使用一个joblib工作程序)。最终,即使在与各种选项(joblib中的线程与进程、每台机器的进程数等)进行了斗争之后,我也从未看到过它的良好性能。让dask调用joblib扼杀了我的并行性,也许引擎盖下面有一些锁定机制。
相反,我重写了程序,完全使用Dask(每个核心一个Dask工人,而不是每台机器一个工人),并去掉了joblib。
cores_per_node=24
cluster = SGECluster(
cores=cores_per_node,
dashboard_address=':0',
job_extra=['-pe {} {}'.format(parallel_environment, cores_per_node), '-j y', '-o /dev/null'],
local_directory='$TMPDIR',
memory=100 GiB,
processes=cores_per_node,
project=project_name,
walltime='00:30:00'
)
requested_nodes=2
total_workers=cores_per_node*requested_nodes
cluster.scale(total_workers)
client.wait_for_workers(total_workers)
在这一点上,我已经申请了2台24核心机器,Dask有48名工人,所以每个核心有一名工人。
#I was not able to distribute one "file" of work to a machine, but I was able to have multiple machines process one file.
info=dict #initialize empty dictionary
for file in list_of_files:
matrix=np.load(file)
remote_matrix=client.scatter(matrix) #necessary to prevent any dask warnings and seemed to improve performance by reducing communication. each worker
futures = []
for pair in pairs:
#now each pair is a dask job, previously this was sent as jobs to joblib
futures.append(client.submit(example, remote_matrix, pair, workers=first_host))
tmp_result=client.gather(futures) #blocks until all jobs completed
info[file]=tmp_result #store the results for one file
由于这个特殊问题的性质,这种变通方法只是一个合理的选择。它的并行性令人尴尬,并不真正需要两个级别的并行性,而是可以让多台机器一次处理一个文件。我仍然不确定我将如何处理这样一项任务,即我确实需要每台机器有一个"父"工作者来将子流程启动到各自的核心(除了切换到C/Fortran/C++和使用MPI/OMP之外)。
即使使用纯Dask,我也看不出有什么方法可以在不进行一些黑客攻击的情况下创建父/子关系(例如,创建客户端列表而不是使用scale_up),这让我可以控制任务/子任务在多台机器上的物理位置。