上下文
目前,我使用Snowflake作为数据仓库,使用AWS的S3作为数据湖。S3上的大多数文件都是Parquet格式的。对于这些,我使用了Snowflake的一个新的有限功能(此处介绍),该功能可以自动检测S3上拼花文件中的模式,我可以使用它生成具有正确列名和推断数据类型的CREATE TABLE
语句。此功能目前仅适用于Apache Parquet、Avro和ORC文件。我想找到一种方法,以实现相同的期望目标,但CSV文件。
我尝试做什么
这就是我目前推断Parquet文件模式的方式:
select generate_column_description(array_agg(object_construct(*)), 'table') as columns
from table (infer_schema(location=>'${LOCATION}', file_format=>'${FILE_FORMAT}'))
但是,如果我尝试将FILE_FORMAT
指定为csv,该方法将失败。
我考虑过的其他方法:
- 将S3上的所有文件传输到parquet(这涉及更多的代码和基础设施设置,所以不是我的首选,尤其是我想在S3上保留一些自然类型的文件)
- 有一个脚本(例如使用Python中的Pandas等库)来推断S3中文件的模式(这也涉及更多的代码,这将很奇怪,因为镶木地板文件是在Snowflake中处理的,但非镶木地板的文件是由aws上的一些脚本处理的)
- 使用Snowflake UDF来推断模式。还没有充分考虑我在那里的选择
期望行为
当一个新的csv文件降落在S3上(在预先存在的STAGE上)时,我想推断模式,并能够使用推断的数据类型生成CREATE TABLE
语句。最好,我想在Snowflake中这样做,因为那里存在现有的上述模式推理解决方案。如果需要,很乐意添加更多信息。
编辑:自从我写这篇文章以来,Snowflake添加了对CSV模式检测的支持:https://docs.snowflake.com/en/sql-reference/functions/infer_schema.虽然存储过程可以工作,并且可能作为其他项目的起点很有用,但那些只寻找模式检测的人应该在链接中使用支持的方法。
更新:我修改了在非类型化(所有字符串类型列)表中推断数据类型的SP,现在它直接适用于Snowflake阶段。项目代码如下所示:https://github.com/GregPavlik/InferSchema
我编写了一个存储过程来帮助实现这一点;然而,它的唯一目标是推断非类型化列的数据类型。它的工作原理如下:
- 将CSV加载到一个表中,其中所有列都定义为varchars
- 通过对新表的查询来调用SP(主要目的是只获取所需的列,并限制行数以保持类型推断时间合理)
- SP调用中还有旧位置和新位置的DB、模式和表——旧位置包含所有varchar,新位置包含推断类型
然后SP将推断数据类型并创建两个SQL语句。一条语句将使用推断的数据类型创建新表。一条语句将使用适当的包装器(如try_multi_timestamp())从非类型化(全varchar)表复制到新表,这是一个UDF,它扩展了try_to_timestamp)以尝试各种常见格式。
我本想扩展它,这样它就根本不需要非类型化的(全varchar)表,但我还没有找到它。既然它出现在这里,我可以回过头来用这个功能更新SP。您可以指定一个直接从后台文件读取的查询,但必须使用$1、$2…和别名作为列名(否则DDL将尝试创建类似$1的列名)。如果查询直接针对一个阶段运行,对于旧的DB、模式和表,您可以放入任何内容,因为这只用于从select语句生成插入。
-- This shows how to use on the Snowflake TPCH sample, but could be any query.
-- Keep the row count down to reduce the time it take to infer the types.
call infer_data_types('select * from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM limit 10000',
'SNOWFLAKE_SAMPLE_DATA', 'TPCH_SF1', 'LINEITEM',
'TEST', 'PUBLIC', 'LINEITEM');
create or replace procedure INFER_DATA_TYPES(SOURCE_QUERY string,
DATABASE_OLD string,
SCHEMA_OLD string,
TABLE_OLD string,
DATABASE_NEW string,
SCHEMA_NEW string,
TABLE_NEW string)
returns string
language javascript
as
$$
/****************************************************************************************************
* *
* DataType Classes
* *
****************************************************************************************************/
class Query{
constructor(statement){
this.statement = statement;
}
}
class DataType {
constructor(db, schema, table, column, sourceQuery) {
this.db = db;
this.schema = schema;
this.table = table;
this.sourceQuery = sourceQuery
this.column = column;
this.insert = '"@~COLUMN~@"';
this.totalCount = 0;
this.notNullCount = 0;
this.typeCount = 0;
this.blankCount = 0;
this.minTypeOf = 0.95;
this.minNotNull = 1.00;
}
setSQL(sqlTemplate){
this.sql = sqlTemplate;
this.sql = this.sql.replace(/@~DB~@/g, this.db);
this.sql = this.sql.replace(/@~SCHEMA~@/g, this.schema);
this.sql = this.sql.replace(/@~TABLE~@/g, this.table);
this.sql = this.sql.replace(/@~COLUMN~@/g, this.column);
}
getCounts(){
var rs;
rs = GetResultSet(this.sql);
rs.next();
this.totalCount = rs.getColumnValue("TOTAL_COUNT");
this.notNullCount = rs.getColumnValue("NON_NULL_COUNT");
this.typeCount = rs.getColumnValue("TO_TYPE_COUNT");
this.blankCount = rs.getColumnValue("BLANK");
}
isCorrectType(){
return (this.typeCount / (this.notNullCount - this.blankCount) >= this.minTypeOf);
}
isNotNull(){
return (this.notNullCount / this.totalCount >= this.minNotNull);
}
}
class TimestampType extends DataType{
constructor(db, schema, table, column, sourceQuery){
super(db, schema, table, column, sourceQuery)
this.syntax = "timestamp";
this.insert = 'try_multi_timestamp(trim("@~COLUMN~@"))';
this.sourceQuery = SOURCE_QUERY;
this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
this.getCounts();
}
}
class IntegerType extends DataType{
constructor(db, schema, table, column, sourceQuery){
super(db, schema, table, column, sourceQuery)
this.syntax = "number(38,0)";
this.insert = 'try_to_number(trim("@~COLUMN~@"), 38, 0)';
this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
this.getCounts();
}
}
class DoubleType extends DataType{
constructor(db, schema, table, column, sourceQuery){
super(db, schema, table, column, sourceQuery)
this.syntax = "double";
this.insert = 'try_to_double(trim("@~COLUMN~@"))';
this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
this.getCounts();
}
}
class BooleanType extends DataType{
constructor(db, schema, table, column, sourceQuery){
super(db, schema, table, column, sourceQuery)
this.syntax = "boolean";
this.insert = 'try_to_boolean(trim("@~COLUMN~@"))';
this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
this.getCounts();
}
}
// Catch all is STRING data type
class StringType extends DataType{
constructor(db, schema, table, column, sourceQuery){
super(db, schema, table, column, sourceQuery)
this.syntax = "string";
this.totalCount = 1;
this.notNullCount = 0;
this.typeCount = 1;
this.minTypeOf = 0;
this.minNotNull = 1;
}
}
/****************************************************************************************************
* *
* Main function *
* *
****************************************************************************************************/
var pass = 0;
var column;
var typeOf;
var ins = '';
var newTableDDL = '';
var insertDML = '';
var columnRS = GetResultSet(GetTableColumnsSQL(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD));
while (columnRS.next()){
pass++;
if(pass > 1){
newTableDDL += ",n";
insertDML += ",n";
}
column = columnRS.getColumnValue("COLUMN_NAME");
typeOf = InferDataType(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD, column, SOURCE_QUERY);
newTableDDL += '"' + typeOf.column + '" ' + typeOf.syntax;
ins = typeOf.insert;
insertDML += ins.replace(/@~COLUMN~@/g, typeOf.column);
}
return GetOpeningComments() +
GetDDLPrefixSQL(DATABASE_NEW, SCHEMA_NEW, TABLE_NEW) +
newTableDDL +
GetDDLSuffixSQL() +
GetDividerSQL() +
GetInsertPrefixSQL(DATABASE_NEW, SCHEMA_NEW, TABLE_NEW) +
insertDML +
GetInsertSuffixSQL(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD) ;
/****************************************************************************************************
* *
* Helper functions *
* *
****************************************************************************************************/
function InferDataType(db, schema, table, column, sourceQuery){
var typeOf;
typeOf = new IntegerType(db, schema, table, column, sourceQuery);
if (typeOf.isCorrectType()) return typeOf;
typeOf = new DoubleType(db, schema, table, column, sourceQuery);
if (typeOf.isCorrectType()) return typeOf;
typeOf = new BooleanType(db, schema, table, column, sourceQuery); // May want to do a distinct and look for two values
if (typeOf.isCorrectType()) return typeOf;
typeOf = new TimestampType(db, schema, table, column, sourceQuery);
if (typeOf.isCorrectType()) return typeOf;
typeOf = new StringType(db, schema, table, column, sourceQuery);
if (typeOf.isCorrectType()) return typeOf;
return null;
}
/****************************************************************************************************
* *
* SQL Template Functions *
* *
****************************************************************************************************/
function GetCheckTypeSQL(insert, sourceQuery){
var sql =
`
select count(1) as TOTAL_COUNT,
count("@~COLUMN~@") as NON_NULL_COUNT,
count(${insert}) as TO_TYPE_COUNT,
sum(iff(trim("@~COLUMN~@")='', 1, 0)) as BLANK
--from "@~DB~@"."@~SCHEMA~@"."@~TABLE~@";
from (${sourceQuery})
`;
return sql;
}
function GetTableColumnsSQL(dbName, schemaName, tableName){
var sql =
`
select COLUMN_NAME
from ${dbName}.INFORMATION_SCHEMA.COLUMNS
where TABLE_CATALOG = '${dbName}' and
TABLE_SCHEMA = '${schemaName}' and
TABLE_NAME = '${tableName}'
order by ORDINAL_POSITION;
`;
return sql;
}
function GetOpeningComments(){
return `
/**************************************************************************************************************
* *
* Copy and paste into a worksheet to create the typed table and insert into the new table from the old one. *
* *
**************************************************************************************************************/
`;
}
function GetDDLPrefixSQL(db, schema, table){
var sql =
`
create or replace table "${db}"."${schema}"."${table}"
(
`;
return sql;
}
function GetDDLSuffixSQL(){
return "n);";
}
function GetDividerSQL(){
return `
/**************************************************************************************************************
* *
* The SQL statement below this attempts to copy all rows from the string tabe to the typed table. *
* *
**************************************************************************************************************/
`;
}
function GetInsertPrefixSQL(db, schema, table){
var sql =
`ninsert into "${db}"."${schema}"."${table}" selectn`;
return sql;
}
function GetInsertSuffixSQL(db, schema, table){
var sql =
`nfrom "${db}"."${schema}"."${table}" ;`;
return sql;
}
//function GetInsertSuffixSQL(db, schema, table){
//var sql = 'nfrom "${db}"."${schema}"."${table}";';
//return sql;
//}
/****************************************************************************************************
* *
* SQL functions *
* *
****************************************************************************************************/
function GetResultSet(sql){
cmd1 = {sqlText: sql};
stmt = snowflake.createStatement(cmd1);
var rs;
rs = stmt.execute();
return rs;
}
function ExecuteNonQuery(queryString) {
var out = '';
cmd1 = {sqlText: queryString};
stmt = snowflake.createStatement(cmd1);
var rs;
rs = stmt.execute();
}
function ExecuteSingleValueQuery(columnName, queryString) {
var out;
cmd1 = {sqlText: queryString};
stmt = snowflake.createStatement(cmd1);
var rs;
try{
rs = stmt.execute();
rs.next();
return rs.getColumnValue(columnName);
}
catch(err) {
if (err.message.substring(0, 18) == "ResultSet is empty"){
throw "ERROR: No rows returned in query.";
} else {
throw "ERROR: " + err.message.replace(/n/g, " ");
}
}
return out;
}
function ExecuteFirstValueQuery(queryString) {
var out;
cmd1 = {sqlText: queryString};
stmt = snowflake.createStatement(cmd1);
var rs;
try{
rs = stmt.execute();
rs.next();
return rs.getColumnValue(1);
}
catch(err) {
if (err.message.substring(0, 18) == "ResultSet is empty"){
throw "ERROR: No rows returned in query.";
} else {
throw "ERROR: " + err.message.replace(/n/g, " ");
}
}
return out;
}
function getQuery(sql){
var cmd = {sqlText: sql};
var query = new Query(snowflake.createStatement(cmd));
try {
query.resultSet = query.statement.execute();
} catch (err) {
throw "ERROR: " + err.message.replace(/n/g, " ");
}
return query;
}
$$;
你试过STAGES吗?
创建2个阶段。。。一个没有标题,另一个有标题。
请参阅下面的示例。
然后是一点SQL,瞧你的DDL。
唯一的问题-你需要知道列的数量,以放置正确的t.$s.
如果有人能自动化。。。我们会有一个几乎自动的CSV DDL生成器。
显然,一旦有了SQL stmt,只需将create或replace表添加到前面,就可以很好地使用CSV中的所有名称创建表。
:-)
-- create or replace stage CSV_NO_HEADER
URL = 's3://xxx-x-dev-landing/xxx/'
STORAGE_INTEGRATION = "xxxLAKE_DEV_S3_INTEGRATION"
FILE_FORMAT = ( TYPE = CSV SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '"' )
-- create or replace stage CSV
URL = 's3://xxx-xxxlake-dev-landing/xxx/'
STORAGE_INTEGRATION = "xxxLAKE_DEV_S3_INTEGRATION"
FILE_FORMAT = ( TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY = '"' )
select concat('select t.$1 ', t.$1, ',t.$2 ', t.$2,',t.$3 ', t.$3, ',t.$4 ', t.$4,',t.$5 ', t.$5,',t.$6 ', t.$6,',t.$7 ', t.$7,',t.$8 ', t.$8,',t.$9 ', t.$9,
',t.$10 ', t.$10, ',t.$11 ', t.$11,',t.$12 ', t.$12 ,',t.$13 ', t.$13, ',t.$14 ', t.$14 ,',t.$15 ', t.$15 ,',t.$16 ', t.$16 ,',t.$17 ', t.$17 ,' from @xxxx_NO_HEADER/SUB_TRANSACTION_20201204.csv t') from
--- CHANGE TABLE ---
@xxx/SUB_TRANSACTION_20201204.csv t limit 1;
INFER_SCHEMA:
自动检测包含半结构化数据的一组暂存数据文件中的文件元数据模式,并检索列定义。
此函数支持Apache Parquet、Apache Avro、ORC、JSON和CSV文件目前正在预览对JSON和CSV文件的支持
。。。
检索mystage阶段中CSV文件的列定义,并使用MATCH_BY_column_NAME:加载CSV文件
-- Create a file format that sets the file type as CSV. CREATE FILE FORMAT my_csv_format TYPE = csv PARSE_HEADER = true; -- Query the INFER_SCHEMA function. SELECT * FROM TABLE( INFER_SCHEMA( LOCATION=>'@mystage/csv/' , FILE_FORMAT=>'my_csv_format' ) );
相关:JSON和CSV的模式检测——预览