我的代码看起来像这样
def myfunc(param):
# expensive stuff that takes 2-3h
mylist = [...]
client = Client(...)
mgr = DeploymentMgr()
# ... setup stateful set ...
futures = client.map(myfunc, mylist, ..., resources={mgr.hash.upper(): 1})
client.gather(futures)
我在 Kubernetes 集群上运行了 dask。在程序开始时,我创建了一个有状态的集合。这是通过kubernetes.client.AppsV1Api()
完成的。然后,我最多等待 30 分钟,直到我请求的所有工作人员都可用。对于此示例,假设我请求 10 个工作线程,但在 30 分钟后,只有 7 个工作线程可用。最后,我调用client.map()
并向其传递函数和列表。此列表包含 10 个元素。但是,dask 只会使用 7 个工人来处理此列表!即使几分钟后剩余的 3 个工作线程可用,dask 也不会为它们分配任何列表元素,即使第一个元素的处理都没有完成。
我怎样才能改变 dask 的行为?有没有办法告诉 dask(或 dask 的调度程序(定期检查新来的工人并更"正确"地分配工作?或者我可以手动影响这些列表元素的分布吗?
谢谢。
Dask 将在更好地了解任务需要多长时间后平衡负载。 您可以使用配置值给出任务长度的估计
值distributed:
scheduler:
default-task-durations:
myfunc: 1hr
或者,一旦 Dask 完成了其中一项任务,它将知道如何在未来围绕该任务做出决策。
我相信这也在 GitHub 问题跟踪器上出现过几次。 您可能需要搜索 https://github.com/dask/distributed/issues 以获取更多信息。