是否有方法在每次计划流程运行时更新参数



我正试图找到一种方法来更新计划流的每次迭代所使用的参数begin。

例如,假设我有一个流程,计划在一年中每周一运行一次。对于第一个星期一,流需要使用例如5的参数运行。下周一需要使用参数7等进行运行。每周运行所需的参数将按一个常数变化。

根据文档,我似乎可以为每次运行创建一个带有相应参数的时钟,但对于计划多次运行的流来说,这似乎太过分了。

在Prefect中有更简单的方法吗?

这听起来像是您想将某种形式的业务逻辑封装到Parameter计算中,这是将新的Task添加到Flow的一个很好的用例:

import prefect
from prefect import task, Flow, Parameter

varying_param = Parameter("param", default=None) # none as the default
@task(name="Varying Parameter")
def param_calculation(p):
time = prefect.context.scheduled_start_time 
# if a value was provided, use it
if p is not None:
return p

# do some calculations to decide what value is appropriate
# and return it

with Flow("Minimal example") as flow:
param_value = param_calculation(varying_param)
# now use this value in downstream tasks

最新更新