我正在制作气流1.10。
我在KubernetesPodOperator上运行命令有问题,其中整个命令在DAG运行时被评估。
我在DAG运行时生成命令,因为一些命令的参数取决于用户传递的参数。
正如我从文档中读到的KubernetesPodOperator期望字符串列表或jinja模板列表:
:param arguments: arguments of the entrypoint. (templated)
The docker image's CMD is used if this is not provided.
我有PythonOperator生成命令并将其推送到XCOM和KubernetesPodOperator在参数中,我传递由PythonOperator生成的命令。
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
def command_maker():
import random # random is to illustrate that we don't know arguments value before runtime
return f"my_command {random.randint(1, 10)} --option {random.randint(1, 4)}"
def create_tasks(dag):
first = PythonOperator(
task_id="generate_command",
python_callable=command_maker,
provide_context=True,
dag=dag,
)
second = KubernetesPodOperator(
namespace='some_namespace',
image='some_image',
name='execute_command',
dag=dag,
arguments=[f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="return_value")}}']
)
second.set_upstream(first)
不幸的是,KubernetesPodOperator没有正确地运行这个命令,因为他试图运行这样的东西:
[my_command 4 --option 2]
是否有方法在KubernetesPodOperator运行时计算这个列表还是强迫我将所有运行时参数都放到单独的《XCOM》中?我想避免这样的解决方案,因为它需要对我的项目进行大量更改。
arguments=[
"my_command",
f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="first_argument")}}',
"--option",
f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="second_argument")}}',
]
问题是JINJA模板默认将模板作为字符串返回。
在最近的气流中,然而(从Airflow 2.1.0开始),你可以将模板渲染为原生python对象:
https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html rendering-fields-as-native-python-objects
创建DAG时使用render_template_as_native_obj=True
参数
然后你需要格式化你的输出,使python的literal_eval
能够将其转换为python对象。在您的示例中,您必须使输出类似于:
[ 'my_command', '4', '--option', '2' ]
请注意,该参数将返回所有模板的本机对象,因此如果它们返回一些literal_eval可以理解的值,它们也将被转换为本机类型(并且可能会产生一些意想不到的副作用)。
要为气流1.10提供工作解决方案,我必须使用BaseOperator。pre_execute钩。
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.lineage import prepare_lineage
class UnpackCommandKubernetesPodOperator(KubernetesPodOperator):
@prepare_lineage
def pre_execute(self, context):
self.arguments = self.arguments[0].split(" ")