从使用气流KubernetesPodOperator启动的Pod中获取上下文



我们有一些使用KubernetesPodOperator启动pod的数据包,我试图在pod内获得一些信息,如dag_id, task_id, try_number,环境等。

我知道我可以从气流任务的上下文中获得此信息(例如,Python Operator上的kwargs),但我一直在想,是否有一种方法可以从启动的pod中获得该上下文?

谢谢!

我找到了一个很好的解决方案

我为KubernetesPodOperator类制作了一个自定义包装器,并使用气流任务的上下文更新env_vars

import airflow.configuration as config
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator as AirflowKubernetesPodOperator
class KubernetesPodOperator(AirflowKubernetesPodOperator):
def execute(self, context):
environment = config.conf.get('webserver', 'web_server_name')
ti = context['ti']
dag_id = ti.dag_id
task_id = ti.task_id
run_id = context['run_id']
try_number = str(ti._try_number)
labels = { 
'ENVIRONMENT' : environment,
'DAG_ID'      : dag_id, 
'TASK_ID'     : task_id, 
'RUN_ID'      : run_id,
'TRY_NUMBER'  : try_number,
}
self.env_vars.update(labels)
super().execute(context)

相关内容

  • 没有找到相关文章

最新更新