有没有办法在任何运算符之外使用气流宏?
例如,在 DAG 中,我有一个操作:
datestamp = '{{ ds }}'
print(datestamp) # prints string not the date when I run it for any date
scanner = S3KeySensor(
task_id='scanner',
poke_interval=60,
timeout=24 * 60 * 60,
soft_fail=False,
wildcard_match=True,
bucket_key=getPath() + datestamp, #datestamp correctly replaced with execution date
bucket_name=bucketName,
dag=dag)
因此,当调用扫描仪时,"ds"值被替换为预期的执行日期,但我想在其他一些地方使用"ds"值。但在这种情况下,它不会替换值,而是将整个字符串作为"{{ ds }}"获取。在上面的例子中。print 语句打印"{{ ds }}"而不是执行日期。
幸运的是,bucket_key
是模板化的,只需将 jinja 模板放入其中即可。
…
bucket_key=getPath() + '{{ ds }}',
…
完全在运算符之外,您不能使用这些宏。因为计划程序会定期解释该文件,而不仅仅是在 dag 运行期间。那么,当 dag 未运行时,ds
的值是多少?
但是,由于您不太可能希望在任务之外对其进行任何操作,因此可以将其放入模板化字段中。您还可以扩展要模板化的另一个字段。
class MySensor(S3KeySensor):
template_fields = ('bucket_key', 'bucket_name', 'my_thing')
def __init__(self, my_thing=None, *args, **kwargs):
super(MySensor, self).__init__(*args, **kwargs)
self.my_thing = my_thing
def post_execute(self, context):
logging.info(
"I probably wanted to over-ride poke to use {}".format(self.my_thing)
scanner = MySensor(
my_thing='{{ ds }}',
task_id='scanner',
poke_interval=60,
timeout=24 * 60 * 60,
soft_fail=False,
wildcard_match=True,
bucket_key=getPath() + '{{ ds }}',
bucket_name=bucketName,
dag=dag)
编辑:IIRCself.my_thing
在初始化后不会更改,相反,context.my_thing
将在(?pre_execute
和(execute
被称为。
使用双引号。
datestamp = "{{ ds }}"
print datestamp