我有一个dag,每小时安排一次。比方说凌晨01:00、凌晨02:00、凌晨03:00。假设选择了02:00am,但如果01:00am dag运行仍在进行中,则需要取消02:00am实例。
我正在尝试这个代码。
local_tz = pendulum.timezone("America/Chicago")
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2020, 11, 15, tzinfo=local_tz),
'run_as_user': user_id
}
dag = DAG(os.path.basename(__file__).replace(".pyc", "").replace(".py", ""),
catchup=False,
max_active_runs=1
schedule_interval='0 * * * *', #schedule_interval='@hourly'
default_args=default_args
)
def check_prev_dag_run_status(**kwargs):
curr_dag_id = kwargs['dag'].dag_id
curr_task_id = kwargs['task'].task_id
newdate = kwargs['execution_date']
ti = TaskInstance(curr_dag_id, curr_task_id, newdate)
state = ti.current_state()
if state=="running":
raise ValueError("Not all previous tasks successfully completed")
check_success_task = PythonOperator(
task_id='check_status',
python_callable= check_prev_dag_run_status,
provide_context=True,
dag=dag
)
run_this_0 = BashOperator(
task_id='run_shell',
bash_command="ksh runshellscript.ksh",
execution_timeout=None,
dag=dag
)
我收到错误信息,
[2020-11-17 12:30:07337]{taskinstance.py:1150}错误-"str"对象没有属性"dag_id">
追踪(最近一次通话(:文件"/airflow/bd/pyenv/pycdr/lib/python3.7/site packages/airflow/models/taskinstance.py";,第984行,在_run_raw_task中result=task_copy.exexecute(context=context(
文件"/airflow/bd/pyenv/pycdr/lib/python3.7/site packages/airflow/operators/python_operator.py";,第113行,执行中return_value=self.execute_callable((
文件"/airflow/bd/pyenv/pycdr/lib/python3.7/site packages/airflow/operators/python_operator.py";,第118行,在execute_ca 中
*请给我推荐
- 向airflow.models.taskinstance.taskinstance传递参数时缺少什么
- execution_de是否提供dag运行的前一个实例?或者如何获取前一个实例dag运行状态*
airflow.models.taskinstance.TaskInstance
只接受两个参数,task
和execution_date
,而不是代码中的3。此外,task
不是task_id
,而是定义的任务,在您的示例中,我想它是run_this_0
。您需要传递上一次任务运行的execution_date
,而不是当前任务。此外,状态可能与跑步不同,但仍然不成功,所以我也会改变这一点。
综合所有这些,以下代码应该可以检查上一次DAG运行的run_this_0
是否成功:
def check_prev_dag_run_status(**kwargs):
newdate = kwargs['prev_execution_date']
ti = TaskInstance(run_this_0, newdate)
state = ti.current_state()
if state!="success":
raise ValueError("Not all previous tasks successfully completed")
将此任务的任务concurrenty设置为1。新任务将不会运行,除非上一次运行尚未开始。
设置取决于过去的true。这样做的缺点是,若一个批次失败,下一个批次将不会运行。
使用具有模式重新调度的外部任务传感器来等待较早的批处理完成。