DASK - 在执行期间停止工作线程会导致已完成的任务启动两次



我想使用 dask 处理大约 5000 个批处理任务,这些任务将它们的结果存储在关系数据库中,在它们全部完成后,我想运行一个最终任务,该任务将查询 databse 并生成结果文件(将存储在 AWS S3 中(

所以它或多或少是这样的:

from dask import bag, delayed batches = bag.from_sequence(my_batches()) results = batches.map(process_batch_and_store_results_in_database) graph = delayed(read_database_and_store_bundled_result_into_s3)(results) client = Client('the_scheduler:8786') client.compute(graph)

这有效,但是:在处理即将结束时,许多工作线程处于空闲状态,我希望能够关闭它们(并在 AWS EC2 上节省一些钱(,但如果我这样做,调度程序将"忘记"这些任务已经完成,并尝试在剩余的工作线程上再次运行它们。

我知道这实际上是一个功能,而不是一个错误,因为 Dask 试图在开始之前跟踪所有结果 read_database_and_store_bundled_result_into_s3 ,但是:有什么方法可以告诉我 dask 只编排分布式处理图而不用担心状态管理?

我建议你在它们完成后忘记它们。 此解决方案使用 dask.distributed concurrent.futures 接口,而不是 dask.bag。 特别是它使用as_completed迭代器。

from dask.distributed import Client, as_completed
client = Client('the_scheduler:8786')
futures = client.map(process_batch_and_store_results_in_database, my_batches())
seq = as_completed(futures)
del futures # now only reference to the futures is within seq
for future in seq:
    pass  # let future be garbage collected

最新更新