如果上一个实例dag运行处于运行状态,则跳过当前dag运行



我有一个dag,每小时安排一次。比方说凌晨01:00、凌晨02:00、凌晨03:00。假设选择了02:00am,但如果01:00am dag运行仍在进行中,则需要取消02:00am实例。

我正在尝试这个代码。

local_tz = pendulum.timezone("America/Chicago")
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2020, 11, 15, tzinfo=local_tz),
'run_as_user': user_id
}
dag = DAG(os.path.basename(__file__).replace(".pyc", "").replace(".py", ""),
catchup=False,
max_active_runs=1
schedule_interval='0 * * * *',  #schedule_interval='@hourly'
default_args=default_args
)
def check_prev_dag_run_status(**kwargs):
curr_dag_id = kwargs['dag'].dag_id
curr_task_id = kwargs['task'].task_id
newdate = kwargs['execution_date']
ti = TaskInstance(curr_dag_id, curr_task_id, newdate)
state = ti.current_state()
if state=="running":
raise ValueError("Not all previous tasks successfully completed")

check_success_task = PythonOperator(
task_id='check_status',
python_callable= check_prev_dag_run_status,
provide_context=True,
dag=dag
)
run_this_0 = BashOperator(
task_id='run_shell',
bash_command="ksh runshellscript.ksh",
execution_timeout=None,
dag=dag 
)

我收到错误信息,

[2020-11-17 12:30:07337]{taskinstance.py:1150}错误-"str"对象没有属性"dag_id">

追踪(最近一次通话(:文件"/airflow/bd/pyenv/pycdr/lib/python3.7/site packages/airflow/models/taskinstance.py";,第984行,在_run_raw_task中result=task_copy.exexecute(context=context(

文件"/airflow/bd/pyenv/pycdr/lib/python3.7/site packages/airflow/operators/python_operator.py";,第113行,执行中return_value=self.execute_callable((

文件"/airflow/bd/pyenv/pycdr/lib/python3.7/site packages/airflow/operators/python_operator.py";,第118行,在execute_ca 中

*请给我推荐

  1. 向airflow.models.taskinstance.taskinstance传递参数时缺少什么
  2. execution_de是否提供dag运行的前一个实例?或者如何获取前一个实例dag运行状态*

airflow.models.taskinstance.TaskInstance只接受两个参数,taskexecution_date,而不是代码中的3。此外,task不是task_id,而是定义的任务,在您的示例中,我想它是run_this_0。您需要传递上一次任务运行的execution_date,而不是当前任务。此外,状态可能与跑步不同,但仍然不成功,所以我也会改变这一点。

综合所有这些,以下代码应该可以检查上一次DAG运行的run_this_0是否成功:

def check_prev_dag_run_status(**kwargs):
newdate = kwargs['prev_execution_date']
ti = TaskInstance(run_this_0, newdate)
state = ti.current_state()
if state!="success":
raise ValueError("Not all previous tasks successfully completed")

将此任务的任务concurrenty设置为1。新任务将不会运行,除非上一次运行尚未开始。

设置取决于过去的true。这样做的缺点是,若一个批次失败,下一个批次将不会运行。

使用具有模式重新调度的外部任务传感器来等待较早的批处理完成。

最新更新