DBT从玛瑙获取值.行到字符串

  • 本文关键字:字符串 获取 DBT dbt
  • 更新时间 :
  • 英文 :


我想在COPY INTO语句中运行一个宏到S3桶。显然在snowflake中我不能做动态路径。所以我要用一种简单的方法来解决这个问题。

{% macro unload_snowflake_to_s3() %}
    {# Get all tables and views from the information schema. #}
    {%- set query -%}
        select concat('COPY INTO @MY_STAGE/year=', year(current_date()), '/my_file FROM (SELECT OBJECT_CONSTRUCT(*) from my_table)');
    {%- endset -%}
    -- {%- set final_query = run_query(query) -%}
    -- {{ dbt_utils.log_info(final_query) }}
    -- {{ dbt_utils.log_info(final_query.rows.values()[0]) }}

    {%- do run_query(final_query.columns.values()[0]) -%}
    -- {% do final_query.print_table() %}
{% endmacro %}

基于上面的宏,我想做的是:

  1. 使用CONCAT在桶路径中添加year。因此,查询变成一个字符串。
  2. 再次使用连接到do run_query()的查询来实际运行COPY INTO语句。

从dbt log中得到的输出和错误:

09:06:08  09:06:08 + | column                                                                                                                                                                                                                                                          | data_type |
| ----------------------------------------------------------------------------------------------------------- | --------- |
| COPY INTO @MY_STAGE/year=', year(current_date()), '/my_file FROM (SELECT OBJECT_CONSTRUCT(*) from my_table) | Text      |
09:06:08  09:06:08 + <agate.Row: ('COPY INTO @MY_STAGE/year=2022/my_file FROM (SELECT OBJECT_CONSTRUCT(*) from my_table)')>
09:06:09  Encountered an error while running operation: Database Error
  001003 (42000): SQL compilation error:
  syntax error line 1 at position 0 unexpected '<'.
root@2c50ba8af043:/dbt# 

我认为错误是我没有提取行和列具体是在agate格式。我如何将其转换/提取为字符串?

你可能会有更好的运气与dbt_utils.get_query_results_as_dict

但是您不需要使用数据库来构造该路径。jinja上下文有一个run_started_at变量,它是一个Python datetime对象,因此您可以在jinja中构建字符串,而无需访问数据库:

{% set yr = run_started_at.strftime("%Y") %}
{% set query = 'COPY INTO @MY_STAGE/year=' ~ yr ~ '/my_file FROM (SELECT OBJECT_CONSTRUCT(*) from my_table)' %}

最后,根据你如何调用这个宏,你可能想要用{% if execute %}标志来控制整个事情,所以dbt在解析你的模型时不做COPY

您可以使用dbt_utils.get_query_results_as_dict函数来删除玛瑙部分。也许在那之后你的复制语句可以工作。

{%- set final_query = dbt_utils.get_query_results_as_dict(query) -%}
    {{log(final_query ,true)}}
    {% for keys,val in final_query.items() %}
        
        {{log(keys,true)}}
        {{log( val ,true)}}
    {% endfor %}

如果你像这样运行,你会看到('COPY INTO @MY_STAGE/year=', year(current_date())...'),最后删除('')

{%- set final_val=val | replace('(', '')| replace(')', '') | replace("'", '') -%}

最新更新