如何将 BashOperator 任务的结果用作另一个气流任务的参数?



我需要将job_id参数传递给我的对象DatabricksRunNowOperator((。job_id是执行databricks jobs create --json '{myjson}命令的结果。

$ databricks jobs create --json '{myjson}'

{job_id: 12}

import os
import subprocess    
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.databricks_operator import DatabricksRunNowOperator
def pull_function():
returned_output = subprocess.check_output("echo ti.xcom_pull(key='jobid_CreateCreateRobot')")
return returned_output

dag_CreateRobot = DAG(dag_id='CreateRobot', 
default_args={'owner': 'eric', 
'email': [],
'depends_on_past': False, 
'start_date':'2019-09-16 16:48:28.803023', 
'provide_context': True}, 
schedule_interval='@once')
CreateRobot = BashOperator(dag=dag_CreateRobot, 
task_id='CreateRobot', 
bash_command="databricks jobs create --json '{myjson}')")
RunRobot = DatabricksRunNowOperator(dag=dag_CreateRobot, 
task_id=ti.xcom_pull('RunCreateRobot'), 
job_id=pull_function(), 
databricks_conn_id='myconn', 
json={'token': 'mytoken' })
RunRobot.set_upstream(CreateRobot)

我写了这段代码来解释我的目标,但它不起作用。如何将 BashOperator 任务的结果用于依赖于它的其他任务?

BashOperator中的 bash 命令需要$ databricks jobs create --json '{myjson}'

CreateRobot = BashOperator(dag=dag_CreateRobot, 
task_id='CreateRobot', 
bash_command="databricks jobs create --json '{myjson}')",
xcom_push=True #Specify this in older airflow versions)

上述运算符在执行时将输出的最后一行推送到 xcom。(https://airflow.apache.org/_modules/airflow/operators/bash_operator.html(

可以使用以下命令访问xcom值:

ti.xcom_pull(task_ids='CreateRobot')

相关内容

最新更新