我有以下目录结构
.
├── ConfigSpark.yaml
├── project1
│ ├── dags
│ │ └── dag_1.py
│ └── sparkjob
│ └── spark_1.py
└── sparkutils
我正在尝试导入deConfigSpark.yaml
文件在我的SparkKubernetesOperator
使用:
job= SparkKubernetesOperator(
task_id = 'job',
params=dict(
app_name='job',
mainApplicationFile='/opt/airflow/dags/project1/sparkjob/spark_1.py',
driverCores=1,
driverCoreRequest='250m',
driverCoreLimit='500m',
driverMemory='2G',
executorInstances=1,
executorCores=2,
executorCoreRequest='1000m',
executorCoreLimit='1000m',
executorMemory='2G'
),
application_file='/opt/airflow/dags/ConfigSpark.yaml',
kubernetes_conn_id='conn_prd_eks',
do_xcom_push=True
)
My dag返回以下错误:
jinja2.exceptions.TemplateNotFound: /opt/airflow/dags/ConfigSpark.yaml
我注意到,如果DAG在ConfigSpark的同一目录下。yaml我的任务运行完美,但为什么我的任务不运行,当我把我的日期在子文件夹?
我检查了我的values.yaml
文件,airflowHome
是/opt/airflow
,defaultAirflowRepository
是apache/airflow
。
发生了什么事?
Airflow从存储DAG文件的目录中搜索模板文件(在您的示例中为ConfigSpark.yaml
)。因此,它不会在你的代码中自动找到它。
如果您想将模板文件存储在DAG文件所在的文件夹(/project1/dags
)中,或者将模板文件存储在/project1/dags
文件夹中的嵌套文件夹中,您可以在任务中指定路径:
job = SparkKubernetesOperator(
...,
application_file='/path/to/ConfigSpark.yaml',
...
)
从/project1/dags/path/to/ConfigSpark.yaml
中读取模板文件。
但是,如果模板文件所在的文件夹不是DAG文件所在文件夹的子文件夹,则上述操作将不起作用。在这种情况下,您可以在dag级别指定template_searchpath
:
with DAG(..., template_searchpath="/opt/airflow/dags/repo/dags") as dag:
job = SparkKubernetesOperator(
task_id='job',
application_file='ConfigSpark.yaml',
...,
)
这个路径(例如/opt/airflow/dags
)被添加到Jinja搜索路径中,这样ConfigSpark.yaml
就会被找到。