有没有办法在 dask 中实现以下示例?
import time
from celery import Celery
app = Celery('celery_blog', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def sl():
time.sleep(1)
if __name__ == "__main__":
sleeper = sl.delay()
print('submitted')
print(f"sleeper done: {sleeper.ready()}")
time.sleep(2)
print(f"sleeper done: {sleeper.ready()}")
看着 http://distributed.dask.org/en/latest/asynchronous.html 在我看来,所有示例都需要await
后台任务才能启动任务,但是,等待块。另一方面,在用例中,它说这可以代替非阻塞行为的Celery
。我能找到的例子中没有一个显示类似Celery
片段的东西。我认为我错过了一个关键的拼图。因此,如果您能告诉我如何完成此操作或向我指出一个有用的链接,我将不胜感激。
谢谢
正如@mdurant在评论中所说,您可能正在寻找 Dask 的未来接口。 请参阅 https://docs.dask.org/en/latest/futures.html
from dask.distributed import Client
client = Client()
future = client.submit(function, *args, **kwargs)
future.result() # block then get result