1)使用Python连接器执行Python到雪花的数据连接2)外部阶段设置指向S3桶需求是基于每个CSV创建一个动态表。
Eg-我在s3桶中有10个CSV,然后应该动态创建10个不同的表,引用外部阶段
Sql_query = ?
Sql_query= copy到db.schema.table中external_stage(在已经在snowflake中创建表结构的场景中)
参考python与雪花的连接。
参考使用python查询snowflake的数据。
下面的示例将读取一个包含两个文件和的阶段根据stage和中的文件名创建两个表然后在每个表中插入数据。
这是舞台的样子[输出截断,按列排列]-
list @test_stage;
+------------------------------+------+
| name | size |
|------------------------------+------+
| test_stage/data_comma.csv.gz | 128 |
| test_stage/date_data.csv.gz | 128 |
+------------------------------+------+
创建动态表和从stage
插入数据的Python代码import snowflake.connector
# Below setting up connection properties
# Providing all values for database, schema
# therefore prefixing those values later in code
# is not needed - can be modified as needed.
con = snowflake.connector.connect(
user='user',
password='password',
account='ab313.us-east-2.aws',
warehouse='COMPUTE_WH',
database='TEST_DB',
schema='PUBLIC'
)
try:
cur = con.cursor()
# Query stage and get file-names
# Split file name to extract part NOT containing extensions
cur.execute("select DISTINCT split_part(metadata$filename,'.',1) as col1, metadata$filename as col2 from @test_stage;")
for (col1,col2) in cur:
# Dynamic table creation is based on files in stage.
# Here stage files are with skip-header, so column name are
# hard-coded.
# Even if we try and get column-names from header row
# (if included) in stage file, data-types still needs
# hard-coding or perhaps fetching from other table
# if needed complete dynamism.
cur.execute("create transient table "+col1+"(id number);")
# Copy data into table created above from stage file
cur.execute("copy into "+col1+" from (select $1 from @test_stage/"+col2+");")
finally:
con.close()