>我有一个自定义DAG,例如:
dag = {'load': (load, 'myfile.txt'),
'heavy_comp': (heavy_comp, 'load'),
'simple_comp_1': (sc_1, 'heavy_comp'),
'simple_comp_2': (sc_2, 'heavy_comp'),
'simple_comp_3': (sc_3, 'heavy_comp')}
我希望计算密钥simple_comp_1
、simple_comp_2
和simple_comp_3
,我执行如下:
import dask
from dask.distributed import Client
from dask_yarn import YarnCluster
task_1 = dask.get(dag, 'simple_comp_1')
task_2 = dask.get(dag, 'simple_comp_2')
task_3 = dask.get(dag, 'simple_comp_3')
tasks = [task_1, task_2, task_3]
cluster = YarnCluster()
cluster.scale(3)
client = Client(cluster)
dask.compute(tasks)
cluster.shutdown()
似乎,在没有缓存的情况下,这 3 个键的计算也会导致 heavy_comp
的计算 3 次。由于这是一个繁重的计算,我尝试从这里实现机会主义缓存,如下所示:
from dask.cache import Cache
cache = Cache(2e9)
cache.register()
但是,当我尝试打印正在缓存的结果时,我什么也没得到:
>>> cache.cache.data
[]
>>> cache.cache.heap.heap
{}
>>> cache.cache.nbytes
{}
我什至尝试将缓存大小增加到 6GB,但没有效果。我做错了什么吗?如何让 Dask 缓存密钥heavy_comp
的结果?
扩展 MRocklin的答案,并在问题下方的评论中格式化代码。
一次计算整个图形可以按预期工作。 heavy_comp
只会执行一次,这是您想要的。请考虑您在空函数定义完成的注释中提供的以下代码:
def load(fn):
print('load')
return fn
def sc_1(i):
print('sc_1')
return i
def sc_2(i):
print('sc_2')
return i
def sc_3(i):
print('sc_3')
return i
def heavy_comp(i):
print('heavy_comp')
return i
def merge(*args):
print('merge')
return args
dag = {'load': (load, 'myfile.txt'), 'heavy_comp': (heavy_comp, 'load'), 'simple_comp_1': (sc_1, 'heavy_comp'), 'simple_comp_2': (sc_2, 'heavy_comp'), 'simple_comp_3': (sc_3, 'heavy_comp'), 'merger_comp': (merge, 'sc_1', 'sc_2', 'sc_3')}
import dask
result = dask.get(dag, 'merger_comp')
print('result:', result)
它输出:
load heavy_comp sc_1 sc_2 sc_3 merge result: ('sc_1', 'sc_2', 'sc_3')
如您所见,"heavy_comp"只打印一次,表明函数heavy_comp
只执行过一次。
核心 Dask 库中的机会缓存仅适用于单机调度程序,不适用于分布式调度程序。
但是,如果您只是一次计算整个图形,Dask 将智能地保留中间值。 如果存在您想要保留的值,则还可以查看persist
函数。