我想使用 dask 处理大约 5000 个批处理任务,这些任务将它们的结果存储在关系数据库中,在它们全部完成后,我想运行一个最终任务,该任务将查询 databse 并生成结果文件(将存储在 AWS S3 中(
所以它或多或少是这样的:
from dask import bag, delayed
batches = bag.from_sequence(my_batches())
results = batches.map(process_batch_and_store_results_in_database)
graph = delayed(read_database_and_store_bundled_result_into_s3)(results)
client = Client('the_scheduler:8786')
client.compute(graph)
这有效,但是:在处理即将结束时,许多工作线程处于空闲状态,我希望能够关闭它们(并在 AWS EC2 上节省一些钱(,但如果我这样做,调度程序将"忘记"这些任务已经完成,并尝试在剩余的工作线程上再次运行它们。
我知道这实际上是一个功能,而不是一个错误,因为 Dask 试图在开始之前跟踪所有结果 read_database_and_store_bundled_result_into_s3
,但是:有什么方法可以告诉我 dask 只编排分布式处理图而不用担心状态管理?
我建议你在它们完成后忘记它们。 此解决方案使用 dask.distributed concurrent.futures 接口,而不是 dask.bag。 特别是它使用as_completed迭代器。
from dask.distributed import Client, as_completed
client = Client('the_scheduler:8786')
futures = client.map(process_batch_and_store_results_in_database, my_batches())
seq = as_completed(futures)
del futures # now only reference to the futures is within seq
for future in seq:
pass # let future be garbage collected