如何在任务完成时获得任务的结果,而不是在 Dask 中完成



我有一个 dask 数据帧,想要计算一些独立的任务。有些任务比其他任务快,但是在完成较长的任务后,我会得到每个任务的结果。

我创建了一个本地客户端并使用client.compute()发送任务。然后我使用 future.result() 来获取每个任务的结果。

我正在使用线程同时请求结果,并测量每个结果计算的时间,如下所示:

def get_result(future,i):
    t0 = time.time()
    print("calculating result", i)
    result = future.result()
    print("result {} took {}".format(i, time.time() - t0))
client = Client()
df = dd.read_csv(path_to_csv)
future1 = client.compute(df[df.x > 200])
future2 = client.compute(df[df.x > 500])
threading.Thread(target=get_result, args=[future1,1]).start()
threading.Thread(target=get_result, args=[future2,2]).start()

我希望上述代码的输出如下所示:

calculating result 1
calculating result 2
result 2 took 10
result 1 took 46

由于第一个任务更大。

但相反,我同时得到了两者

calculating result 1
calculating result 2
result 2 took 46.3046760559082
result 1 took 46.477620363235474

我认为这是因为 future2 实际上是在后台计算并在 future1 之前完成,但它等到 future1 完成才返回。

有没有办法在 future2 完成的那一刻获得它的结果?

您不需要创建线程以异步方式使用 futures——它们本质上已经是异步的,并在后台监控它们的状态。如果要按准备就绪的顺序获得结果,则应使用 as_completed .

但是,根据您的具体情况,您可能只想查看仪表板(或使用df.visulalize()(来了解正在发生的计算。这两种未来都依赖于阅读 CSV,在运行之前都需要执行这项任务 - 并且可能需要绝大多数时间。Dask 不知道,如果不扫描所有数据,哪些行具有 x 的值。

最新更新