我正试图找到一种方法来更新计划流的每次迭代所使用的参数begin。
例如,假设我有一个流程,计划在一年中每周一运行一次。对于第一个星期一,流需要使用例如5的参数运行。下周一需要使用参数7等进行运行。每周运行所需的参数将按一个常数变化。
根据文档,我似乎可以为每次运行创建一个带有相应参数的时钟,但对于计划多次运行的流来说,这似乎太过分了。
在Prefect中有更简单的方法吗?
这听起来像是您想将某种形式的业务逻辑封装到Parameter
计算中,这是将新的Task
添加到Flow的一个很好的用例:
import prefect
from prefect import task, Flow, Parameter
varying_param = Parameter("param", default=None) # none as the default
@task(name="Varying Parameter")
def param_calculation(p):
time = prefect.context.scheduled_start_time
# if a value was provided, use it
if p is not None:
return p
# do some calculations to decide what value is appropriate
# and return it
with Flow("Minimal example") as flow:
param_value = param_calculation(varying_param)
# now use this value in downstream tasks