如何根据Spark程序返回的退出代码跳过Airflow SparkSubmitOperator任务



我的Spark提交应用程序正在进行一些查询,返回不同的退出代码取决于数据集状态。

在我的spark-submit操作符之后,是否可以跳过下游任务我正在考虑BashOperator的skip_exit_code功能,它在所有其他运算符中都意外地缺失了。

def spark_job(task_id: str, cfg: ArgList, main_class: str, dag: DAG) -> BaseOperator:
copy_args = cfg.to_arg_list()
return SparkSubmitOperator(
task_id=task_id,
conn_id='spark_default',
java_class=main_class,
application=SPARK_JOBS_JAR,
application_args=copy_args,
total_executor_cores='2',
executor_cores='1',
executor_memory='1g',
num_executors='1',
name=task_id,
verbose=False,
driver_memory='1g',
dag=dag
)
cfg = CheckDataCfg(...)
check_data_task = spark_job('check-data', cfg, 'etljobs.spark.CheckDataRecieved', dag)
check_data_task >> '<my next task which I need to skip sometimes>'

更新:如果returncode不为0,则当前SparkSubmitHook实现确实引发异常。所以我后来只找到了两个解决办法:

  1. 创建自定义SparkSubmitHookSparkSubmitOperator类以忽略用户定义的非零退出代码,从而引发AirflowSkipException异常或将返回代码值推送到XCom以供进一步使用
  2. 请改用BashOperator。它已经支持skip_exit_code功能。您需要在Python中手动构造所有CLI Spark参数,这不是什么大不了的事

SparkSubmitHook有_spark_exit_code,可以在这里使用。我们可以创建一个自定义运算符,该运算符继承所有SparkSubmitOperator功能,并添加返回_spark_exit_code值。

我没有测试它,但我认为以下代码应该适用于您:

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.python_operator import ShortCircuitOperator
def shortcircuit_fn(**context):
exit_code = context['ti'].xcom_pull(task_ids='check-data')
if exit_code=='125': # Place the codes here
return True
return False

class MySparkSubmitOperator(SparkSubmitOperator):
def execute(self, context):
super().execute(context)
return self._hook._spark_exit_code
with DAG(dag_id='spar',
default_args=default_args,
schedule_interval=None,
) as dag:
spark_op = MySparkSubmitOperator(task_id='check-data',..., do_xcom_push=True)
short_op = ShortCircuitOperator(task_id='short_circuit', python_callable=shortcircuit_fn)
next_op = AnyOperator()
spark_op >> short_op >> next_op

它是这样工作的:MySparkSubmitOperator将推至_spark_exit_code的值xcom,然后ShortCircuitOperator将根据预期代码对其进行验证。如果满足条件,则工作流将继续,如果不满足条件,将所有下游任务标记为跳过。

相关内容

最新更新