我不明白我想在Dask中做的事情是否可能…
目前,我有一长串繁重的文件。我使用多处理库来处理列表的每个条目。My函数打开和进入,对其进行操作,将结果以二进制文件的形式保存到磁盘,并返回None。一切正常。我这样做主要是为了减少RAM的使用。
我也想做同样的事。在Dask中,但我无法弄清楚如何并行保存二进制数据。在我看来,它应该是这样的:
for element in list:
new_value = func(element)
new_value.tofile('filename.binary')
,其中一次只能加载N个元素,其中N是工作元素的数量,每个元素在每个循环结束时被使用和遗忘。
有可能吗?
非常感谢您的建议!
这听起来确实是个可行的任务:
from dask import delayed, compute
@delayed
def myfunc(element):
new_value = func(element)
new_value.tofile('filename.binary') # you might want to
# change the destination for each element...
delayeds = [myfunc(e) for e in list]
results = compute(delayeds)
如果您想对任务进行良好的控制,您可能希望通过启动LocalCluster
:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=3)
client = Client(cluster)
自定义设置/工作流还有很多可以做的事情,但也许上面的方法适合你的用例。