自定义操作符的气流模板不能在函数参数中工作



我正在使用气流中的Jinja的模板来参数化这里描述的操作符。

我将代码修改如下:

class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("name",)
def f(self, name):
'This function does nothing'
return name
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.name2 = self.f(name)
self.name3 = self.f(self.name)
def execute(self, context):
message = f"Hello from {self.name}"
print(message)
message2 = f"Hello from {self.name2}"
print(message2)
message3 = f"Hello from {self.name3}"
print(message3)
return message
我得到的输出如下:
[2022-12-07, 17:33:23 UTC] {logging_mixin.py:115} INFO - Hello from task_id_1
[2022-12-07, 17:33:23 UTC] {logging_mixin.py:115} INFO - Hello from {{ task_instance.task_id }}
[2022-12-07, 17:33:23 UTC] {logging_mixin.py:115} INFO - Hello from {{ task_instance.task_id }}

谁能解释一下为什么模板只在第一种情况下工作?文档说的是Note that Jinja substitutes the operator attributes and not the args.,但这如何适用于第二种和第三种情况?

您看到的行为是预期的。模板化在运行时发生,因为只有在运行时您才知道如何用实际值替换Jinja字符串。准备要运行的任务涉及许多步骤(模板化是其中一个步骤)。

__init__函数在此之前执行(实际上每次解析DAG时都执行它),因此当您到达模板阶段时,您有:

self.name = {{ task_instance.task_id }}
self.name2 = {{ task_instance.task_id }}
self.name3 = {{ task_instance.task_id }}

但是由于您在template_fields中只列出了name,那么Jinja引擎只运行在name上,这给了您:

self.name = task_id_1
self.name2 = {{ task_instance.task_id }}
self.name3 = {{ task_instance.task_id }}

最新更新