从snowflake的外部阶段动态创建表



1)使用Python连接器执行Python到雪花的数据连接2)外部阶段设置指向S3桶需求是基于每个CSV创建一个动态表。

Eg-我在s3桶中有10个CSV,然后应该动态创建10个不同的表,引用外部阶段

Sql_query = ?

Sql_query= copy到db.schema.table中external_stage(在已经在snowflake中创建表结构的场景中)

参考python与雪花的连接。

参考使用python查询snowflake的数据。

下面的示例将读取一个包含两个文件和的阶段根据stage和中的文件名创建两个表然后在每个表中插入数据。

这是舞台的样子[输出截断,按列排列]-

list @test_stage;
+------------------------------+------+
| name                         | size |
|------------------------------+------+
| test_stage/data_comma.csv.gz |  128 |
| test_stage/date_data.csv.gz  |  128 |
+------------------------------+------+

创建动态表和从stage

插入数据的Python代码
import snowflake.connector
# Below setting up connection properties
# Providing all values for database, schema
# therefore prefixing those values later in code
# is not needed - can be modified as needed.
con = snowflake.connector.connect(
user='user',
password='password',
account='ab313.us-east-2.aws',
warehouse='COMPUTE_WH',
database='TEST_DB',
schema='PUBLIC'
)
try:
cur = con.cursor()

# Query stage and get file-names
# Split file name to extract part NOT containing extensions

cur.execute("select DISTINCT split_part(metadata$filename,'.',1) as col1, metadata$filename as col2 from @test_stage;")
for (col1,col2) in cur:
# Dynamic table creation is based on files in stage.
# Here stage files are with skip-header, so column name are 
# hard-coded.
# Even if we try and get column-names from header row 
# (if included) in stage file, data-types still needs 
# hard-coding or perhaps fetching from other table 
# if needed complete dynamism.
cur.execute("create transient table "+col1+"(id number);")

# Copy data into table created above from stage file
cur.execute("copy into "+col1+" from (select $1 from @test_stage/"+col2+");")    
finally:
con.close()

相关内容

  • 没有找到相关文章

最新更新