在操作员外部使用气流宏



有没有办法在任何运算符之外使用气流宏?

例如,在 DAG 中,我有一个操作:

datestamp = '{{ ds }}'
print(datestamp) # prints string not the date when I run it for any date
scanner = S3KeySensor(
task_id='scanner',
poke_interval=60,
timeout=24 * 60 * 60,
soft_fail=False,
wildcard_match=True,
bucket_key=getPath() + datestamp, #datestamp correctly replaced with execution date
bucket_name=bucketName,
dag=dag)

因此,当调用扫描仪时,"ds"值被替换为预期的执行日期,但我想在其他一些地方使用"ds"值。但在这种情况下,它不会替换值,而是将整个字符串作为"{{ ds }}"获取。在上面的例子中。print 语句打印"{{ ds }}"而不是执行日期。

幸运的是,bucket_key是模板化的,只需将 jinja 模板放入其中即可。

…
bucket_key=getPath() + '{{ ds }}',
…

完全在运算符之外,您不能使用这些宏。因为计划程序会定期解释该文件,而不仅仅是在 dag 运行期间。那么,当 dag 未运行时,ds的值是多少?

但是,由于您不太可能希望在任务之外对其进行任何操作,因此可以将其放入模板化字段中。您还可以扩展要模板化的另一个字段。

class MySensor(S3KeySensor):
template_fields = ('bucket_key', 'bucket_name', 'my_thing')
def __init__(self, my_thing=None, *args, **kwargs):
super(MySensor, self).__init__(*args, **kwargs)
self.my_thing = my_thing
def post_execute(self, context):
logging.info(
"I probably wanted to over-ride poke to use {}".format(self.my_thing)
scanner = MySensor(
my_thing='{{ ds }}',
task_id='scanner',
poke_interval=60,
timeout=24 * 60 * 60,
soft_fail=False,
wildcard_match=True,
bucket_key=getPath() + '{{ ds }}',
bucket_name=bucketName,
dag=dag)

编辑:IIRCself.my_thing在初始化后不会更改,相反,context.my_thing将在(?pre_execute和(execute被称为。

使用双引号。

datestamp = "{{ ds }}"
print datestamp

最新更新