气流终止EMR集群



我正在使用EMR集群来运行一些并行运行的作业。这两个作业都在同一个集群中运行。我已经将action_on_failure字段设置为'CONTINUE',这样,如果一个任务失败,另一个任务应该在集群中运行。我希望在完成这两个任务后,无论成功或失败,都能运行EMRMTerminateCluster的最终任务。

task2 
task1 >>       >> task4
task3 

我希望我的dags以这样一种方式运行,即任务4只在任务2和任务3之后开始。

有什么办法吗?

我正在回答您关于终止EMR集群的一个问题,以避免不必要的云计费

  • 您需要在DAG 中的EMR终止任务id中将气流触发规则设置为all_done

  • 无论上游任务是失败还是成功,都会执行具有all_done触发规则的任务id。

  • 终止EMR任务id 的示例代码

terminate_emr_cluster = PythonOperator(
task_id='terminate_emr_cluster',
python_callable=terminate_cluster,
op_args=['{{ ti.xcom_pull("create_emr_cluster")["JobFlowId"]}}'],
dag=dag, 
trigger_rule="all_done",
)

从这里获取DAG代码-https://github.com/SatadruMukherjee/Data-Preprocessing-Models/blob/main/airflow_emr_spark_s3_snowflake.py

最新更新