在不调用python运算符的情况下读取cli输入



我们希望在dag中的Dagtrigger期间从UI读取传递给dag的cli输入。我试过下面的代码,但不起作用。这里我将输入作为{"kpi":"ID123"}传递我想在我的函数get_data_from_bq 中打印这个ip值

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow import models
from airflow.models import Variable
from google.cloud import bigquery
from airflow.configuration import conf
LOCATION          = Variable.get("HDM_PROJECT_LOCATION")
PROJECT_ID        = Variable.get("HDM_PROJECT_ID")
client = bigquery.Client()
kpi='{{ kpi}}'
# default arguments
default_dag_args = {
'start_date':days_ago(0),
'retries': 0,
'project_id': PROJECT_ID
}
# Setting airflow environment varriable,getting hdm_batch_details data and updating it
def get_data_from_bq(**kwargs):
print("op is:")
print(kpi)
#Dag Defination
with models.DAG(
'00_test_sql1',
schedule_interval=None,
default_args=default_dag_args) as dag:

v_run_sql_01 = PythonOperator(
task_id='Run_SQL',
provide_context=True,
python_callable=get_data_from_bq,
location=LOCATION,
use_legacy_sql=False)
v_run_sql_01

注意:我不想使用任何运算符来读取从cli 传递的数据

注意:我不想使用任何运算符来读取从cli 传递的数据

这是不可能的。只有当有任务要运行时,才会创建Dag Run。

你应该明白:

  • DAG+其顶级代码-构建由任务组成的DAG结构

  • DAG运行->是DAG运行的单个实例,其中包含要执行的任务实例。Dag Run简单地由属于Dag Run的任务实例组成;dag-run";。

您传递的配置是";dag_run.conf";而不是";dag.conf"-这意味着它只为DagRun指定,而DagRun只对属于它的所有任务实例有效。

只有任务实例可以访问dag_run.conf

最新更新