Snowflake|Airflow |谷歌云:如何以csv格式从Snowflake表中卸载数据,并使用Airflow上传



有没有办法将数据从Snowflake卸载到csv格式,或者可以直接以csv格式存储在谷歌云存储中?

我们使用composer(airflow(dags连接到snowflake,将表中的数据卸载到csv文件中,并将其存储在谷歌云存储中,然后进一步迁移。

我尝试过的:

从雪花表中查询数据并将其获取到变量中。

我想进一步做什么:

将数据转换为csv文件(因为还没有运行代码(,并将其迁移到GCS存储桶,但气流中似乎只有GCStoGCSoperator,这对此没有帮助。

我的想法:

  1. 如果我应该使用带有调度器的python文件,而不是在DAG中编写
  2. 通过数据流(beam(完成并在composer上运行

代码:-

def func(**context):
dwh_hook = SnowflakeHook(snowflake_conn_id="snowflake_conn")
result = dwh_hook.get_first("select col1,col2,col3,col4,col5 from table_name where col_name = previous_date_func_here")
# print(result)

我还没有测试它,因为我想用GCS测试它,但它似乎不起作用。有什么办法?真的有气流可以做到这一点吗?

Snowflake支持使用COPY INTO位置命令卸载数据:

将表(或查询(中的数据卸载到以下位置之一的一个或多个文件中:

  • 命名的内部阶段(或表/用户阶段(。然后可以使用GET命令从阶段/位置下载文件
  • 引用外部位置的命名外部阶段(Amazon S3、Google Cloud Storage或Microsoft Azure(
  • 外部位置(Amazon S3、Google Cloud Storage或Microsoft Azure(

格式类型选项(formatTypeOptions(

  • TYPE=CSV

  • TYPE=JSON-

  • TYPE=镶木地板


将数据从表直接卸载到外部位置中的文件

谷歌云存储

使用名为myint:的参考存储集成访问参考地面军事系统存储桶

COPY INTO 'gcs://mybucket/unload/'
FROM mytable
STORAGE_INTEGRATION = myint
FILE_FORMAT = (FORMAT_NAME = my_csv_format);

相关:为谷歌云存储配置集成

最新更新