如果"defect"任务失败,是否可以使用不同的"dask"参数重新运行它



考虑一个内存需求事先未知的prefect任务。如果由于工作程序内存不足而导致任务失败,是否可以修改dask工作程序参数并重新运行任务?

如果有一种方法可以在每次失败后将每个工作者的内存分配增加一些值,那就太好了。

很难给出一般的答案,因为这取决于您的基础设施。

  1. 例如,如果您想为Daskcluster_class的每个流运行提供自定义关键字参数,则可以将动态函数传递给DaskExecutorcluster_class。此函数可以从Parameter任务中检索值,如n_workers,如下所示:
import prefect
from prefect import Flow, Parameter
from prefect.executors import DaskExecutor
def dynamic_executor():
from distributed import LocalCluster
# could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
return LocalCluster(n_workers=prefect.context.parameters["n_workers"])
with Flow(
"dynamic_n_workers", executor=DaskExecutor(cluster_class=dynamic_executor)
) as flow:
flow.add_task(Parameter("n_workers", default=5))

这意味着您可以使用n_workers定义的不同值启动一个新的流运行。

  1. 第二个选项是在每个流运行的基础上在运行配置中分配更多内存,例如,您可以从UI覆盖KubernetesRun上设置的memory_request
with Flow(
FLOW_NAME,
storage=STORAGE,
run_config=KubernetesRun(
labels=["k8s"],
cpu_request=0.5,
memory_request="2Gi",
),
) as flow:

上面的代码片段定义了2GB,但如果您注意到流运行以OOM错误结束,并且您需要更多,则可以从具有更高内存请求的UI触发新的流运行。

  1. 最后一个选项是直接覆盖流定义中的执行器值:
import coiled
from prefect.executors import DaskExecutor
flow.executor = DaskExecutor(
cluster_class=coiled.Cluster,
cluster_kwargs={
"software": "user/software_env_name",
"shutdown_on_close": True,
"name": "prefect-cluster",
"scheduler_memory": "4 GiB",
"worker_memory": "8 GiB",
},
)

只要你使用脚本存储(例如GitHub、Git、Gitlab、Bitbucket等Git存储类之一(而不是pickle存储,并且你用修改后的值worker_memory提交代码,这应该反映在你的新流运行中,因为关于执行器的元数据没有存储在后端,而是从流存储中检索的。

最新更新