气流:如何将json/dict格式的变量传递给运算符



例如,我需要知道如何将注册的字典作为运算符参数中的变量传递,以启动databricks笔记本。

就我而言,我尝试过一些东西,但没有奏效。

我在Airflow中保存了这个变量:

"dictionaries" : {
"dict1" : {
"a" : 1,
"b" : 2
}

并且这个代码试图检索变量

dict = "dict1"
values = f"{{{{ var.json.dictionaries.{dict} }}}}"

稍后将其作为字典与操作员参数中的其他值一起传递

task1 = DatabricksRunNowOperator(
task_id=f'Databricks_{dict1}',
databricks_conn_id='databricks',
job_id= 1111,
notebook_params={"param1": "param1" , **values}

这是失败的,因为变量以str"形式出现;TypeError:"str"对象不是映射",所以我尝试使用json库来尝试转换格式,但没有成功,我收到了错误消息";需要用双引号括起来的属性名称";

所以我怀疑它可能是用单引号检索变量,我也尝试过使用replace将它们改为双引号,但也没有成功。

json.loads(values.replace("'","""))

使用jinja来检索这个变量的行为可能不一样,我有点迷失了方向,我已经尝试了上一次

values = json.loads(f"{{{{ (var.json.dictionaries.{dict1}).replace(''','"') }}}}")
values = json.loads((f"{{{{ var.json.dictionaries.{dict1} }}}}").replace(''','"'))

也在操作员内部

notebook_params={"param1": "param1", **json.loads((f"{{{{ var.json.dictionaries.{dict1} }}}}").replace(''','"'))}

但是得到相同的错误";json.decoder.JSONDecodeError:需要用双引号括起来的属性名";

当然,我误解了恢复变量并转换它的方法,如果有人能帮助我,我将不胜感激。

问候

您必须保存变量的值,如下所示:

{
"dictionaries" : {
"dict1" : {
"a" : 1,
"b" : 2
}
}
}

当您访问代码中的变量时,传递deserialize_json=True。假设您的变量当时被称为"json",

from airflow.models import Variable
json_data = Variable.get('json', deserialize_json=True)
dict1 = json_data['dictionaries']['dict1']

最新更新