为什么airflow KubernetsPodOperator资源无法解析作业触发参数


  1. 问题:我使用了带有资源的airflow KubernetsPodOperator,该资源来自作业触发器参数,但得到了以下错误日志
[2020-12-24 12:19:36,166] {taskinstance.py:1150} ERROR - (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Audit-Id': '683111d0-7af3-45c5-9bdc-26353fc7e140', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Thu, 24 Dec 2020 12:19:36 GMT', 'Content-Length': '584'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod in version "v1" cannot be handled as a Pod: v1.Pod.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.Resources: v1.ResourceRequirements.Limits: unmarshalerDecoder: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$', error found in #10 byte of ...|.conf.cpu", "memory"|..., bigger context ...|"resources": {"limits": {"cpu": "dag_run.conf.cpu", "memory": "dag_run.conf.memory"}, "requests": {"|...","reason":"BadRequest","code":400}

似乎没有解析参数dag_run.conf.cpudag_run.conf.memory!我已经尝试过将资源设置为静态值,它很有效!但我不明白为什么参数没有被解析。

  1. 代码:代码如下
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow import configuration as conf
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('example_kubernetes_pod',
schedule_interval='@once',
default_args=default_args)  

# it not work and got the error 
with dag:
k = KubernetesPodOperator(
namespace='test',
image="hello-world",
labels={"foo": "bar"},
name="airflow-test-pod",
task_id="task-one",
in_cluster=in_cluster, 
cluster_context='docker-for-desktop',
config_file=config_file,
resources={"request_cpu": "{{ dag_run.conf.cpu }}",
"request_memory": "{{ dag_run.conf.memory }}",
"limit_cpu": "{{ dag_run.conf.cpu }}",
"limit_memory": "{{ dag_run.conf.memory }}"},

)
# it works
with dag:
k = KubernetesPodOperator(
namespace='test',
image="hello-world",
labels={"foo": "bar"},
name="airflow-test-pod",
task_id="task-one",
in_cluster=in_cluster, 
cluster_context='docker-for-desktop',
config_file=config_file,
resources={"request_cpu": "4",
"request_memory": "8G",
"limit_cpu": "4",
"limit_memory": "8G"},

)
  1. 我用参数触发了此作业:
{
"cpu":"4",
"memory":"8G"
}

您是否尝试过使用以下语法:{{dag_run.conf['conf1']}}

类似这样的东西:

with dag:
k = KubernetesPodOperator(
namespace='test',
image="hello-world",
labels={"foo": "bar"},
name="airflow-test-pod",
task_id="task-one",
in_cluster=in_cluster, 
cluster_context='docker-for-desktop',
config_file=config_file,
resources={"request_cpu": "{{ dag_run.conf['cpu'] }}",
"request_memory": "{{ dag_run.conf['memory'] }}",
"limit_cpu": "{{ dag_run.conf['cpu'] }}",
"limit_memory": "{{ dag_run.conf['memory'] }}"},

)

我在这里读到:https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#passing-触发dags 时的参数

Airflow(当前2.1(不支持资源模板

最新更新