在我们的编排中,我们使用一个DAG来触发其他子DAG,直到这些子DAG没有完成,我们的主DAG将运行以检查状态。我们使用5分钟的睡眠时间来检查每5分钟后子进程的状态。由于此任务持续处于运行状态,因此它在一个worker上消耗资源。最近我们了解了up_for_reschedule。这是否解决了我释放工人的问题?是否可以使用up_for_reschedule与python操作符?如果有,有没有什么文件可以参考?
这里有几个选项。
- 你可以在重新调度模式下使用PythonSensor而不是Operator。在重调度模式下,如果传感器con ,则气流将重新调度任务实例。
task = PythonSensor(
task_id='sensor_example',
mode='reschedule',
python_callable=func
)
- 你可以链接一堆TriggerDagRunOperoator。但是,如果您只想创建狗跑而不想等待状态检查,那么这是最好的。与传感器不同,它没有重新调度模式。因此,当
wait_for_completion
为True时,它将占用工作槽