KubernetesPodOperator语言 - 运行从XCOM中提取的整个命令



我正在制作气流1.10。

我在KubernetesPodOperator上运行命令有问题,其中整个命令在DAG运行时被评估。

我在DAG运行时生成命令,因为一些命令的参数取决于用户传递的参数。

正如我从文档中读到的KubernetesPodOperator期望字符串列表或jinja模板列表:

:param arguments: arguments of the entrypoint. (templated)
The docker image's CMD is used if this is not provided.

我有PythonOperator生成命令并将其推送到XCOM和KubernetesPodOperator在参数中,我传递由PythonOperator生成的命令。

from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

def command_maker():
import random # random is to illustrate that we don't know arguments value before runtime
return f"my_command {random.randint(1, 10)} --option {random.randint(1, 4)}"
def create_tasks(dag):
first = PythonOperator(
task_id="generate_command",
python_callable=command_maker,
provide_context=True,
dag=dag,
)
second = KubernetesPodOperator(
namespace='some_namespace',
image='some_image',
name='execute_command',
dag=dag,
arguments=[f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="return_value")}}']
)
second.set_upstream(first)

不幸的是,KubernetesPodOperator没有正确地运行这个命令,因为他试图运行这样的东西:

[my_command 4 --option 2]

是否有方法在KubernetesPodOperator运行时计算这个列表还是强迫我将所有运行时参数都放到单独的《XCOM》中?我想避免这样的解决方案,因为它需要对我的项目进行大量更改。

arguments=[
"my_command",
f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="first_argument")}}',
"--option",
f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="second_argument")}}',
]

问题是JINJA模板默认将模板作为字符串返回。

在最近的气流中,然而(从Airflow 2.1.0开始),你可以将模板渲染为原生python对象:

https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html rendering-fields-as-native-python-objects

创建DAG时使用render_template_as_native_obj=True参数

然后你需要格式化你的输出,使python的literal_eval能够将其转换为python对象。在您的示例中,您必须使输出类似于:

[ 'my_command', '4', '--option', '2' ]

请注意,该参数将返回所有模板的本机对象,因此如果它们返回一些literal_eval可以理解的值,它们也将被转换为本机类型(并且可能会产生一些意想不到的副作用)。

要为气流1.10提供工作解决方案,我必须使用BaseOperator。pre_execute钩。

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.lineage import prepare_lineage
class UnpackCommandKubernetesPodOperator(KubernetesPodOperator):
@prepare_lineage
def pre_execute(self, context):
self.arguments = self.arguments[0].split(" ")

相关内容

  • 没有找到相关文章

最新更新