我正在构建一个模型,根据查询结果动态引用表名和模式名。
{%- set query %}
select * from master_data.event_metadata where game='{{game_name}}'
{% endset -%}
{%- set results = run_query(query) -%}
{%- if execute %}
{%- set record = results.rows[0] -%}
{% else %}
{%- set record = [] -%}
{% endif -%}
其中两个值在record.SCHEMA_NAME
和record.TABLE_NAME
中。我可以用之类的东西
select
*
from
{{record.SCHEMA_NAME}}.{{record.TABLE_NAME}}
但我更愿意使用source()
函数,这样我的文档和DAG就干净了。如何将record.SCHEMA_NAME
和record.TABLE_NAME
解析为字符串参数。我需要一些类似的东西
select
*
from
{{ source(record.SCHEMA_NAME, record.TABLE_NAME) }}
当我尝试运行上述程序时,我会得到以下错误:
Server error: Compilation Error in rpc request (from remote system)
The source name (first) argument to source() must be a string, got <class 'jinja2.runtime.Undefined'>
您可能已经找到了解决方法或解决方案,但只是为了防止其他人出现同样的情况。。。
要将值转换为字符串,可以使用|string
。例如:
record.SCHEMA_NAME|string
record.TABLE_NAME|string
所以你的查询看起来像这样:
select * from {{ source(record.SCHEMA_NAME|string|lower, record.TABLE_NAME|string|lower) }}
请注意,根据查询的输出和定义源文件的方式,您可能需要lower
或upper
您的值才能与源匹配。
问题
您的record
变量是执行(run_query(query)
(的结果。当你做dbt compile/run
dbt会做一系列操作,比如读取你项目的所有文件,生成一个";manifest.json";文件,并将使用ref
/source
生成DAG,因此此时,不执行SQL,换句话说,execute == False
。
在您的示例中,即使执行了record.SCHEMA_NAME|string
,也将无法检索该变量的值,因为没有执行任何内容,并且由于执行了if not execute then record = []
,您将得到消息... depends on a source named '.' which was not found
,因为此时record
为空。
一种变通方法是将模型的查询封装在if execute
块中,类似于以下内容:
{% if execute %}
select * from {{ source(record.TABLE_SCHEMA|string|lower, record.TABLE_NAME|string|lower) }}
{% endif %}
使用这种方法,您将能够动态地设置模型的源代码。
但不幸的是,这不会像您预期的那样工作,因为它不会为该模型生成DAG。使用if execute
块包装模型的查询将阻止dbt为模型生成DAG。
最后,这将与您第一次尝试在不使用source
函数的情况下声明schema
和table
相同。
有关更多详细信息,您可以查看有关execute mode
的dbt文档:https://docs.getdbt.com/reference/dbt-jinja-functions/execute
我认为在将这两个对象传递给source
宏之前,需要先将它们转换为字符串表示。
试试这个
select
*
from
{{ source(record.SCHEMA_NAME|string, record.TABLE_NAME||string) }}