我们有一个用例导致我写这篇文章,我相信你们中的许多人都会面临这种情况。当时的情况是通过一个Talend作业将多个集合从MongoDB迁移到Snowflake数据库中,并将集合的顶级节点保留为Snowflak表中的单个字段。
现在,我们知道Talend不支持MongoDB源的动态模式,因为MongoDB集合不强制执行模式,这意味着我们必须为我们想要接收的每个现有/新集合创建单独的作业/子作业。我们还必须重新设计作业,以备将来在文档中进行更改,同时确保它始终有效,因此,我们必须寻找替代方案。
这是一种方法,
第一步:从MongoDB集合中获取所有顶级密钥及其类型。我们使用了与$objectToArray的聚合,将所有顶级键和值对转换为文档数组,然后使用$展开和$$group与$addToSet,以在整个集合中获得不同的键和值类型。
{
"_id" : "1",
"keys" : [
"field1~string",
"field2~object",
"filed3~date",
"_id~objectId"
]
}
第二步:在Mongo数据类型和Snowflake数据类型之间创建一对一映射。我们已经创建了一个称为">dataTypes";以存储此信息。或者,该信息可以存储在表格或文件等中。
java.util.Map<String,String> dataTypes = new java.util.HashMap<String,String>();
dataTypes.put("string","VARCHAR");
dataTypes.put("int","NUMBER");
dataTypes.put("objectId","VARCHAR");
dataTypes.put("object","VARIANT");
dataTypes.put("date","TIMESTAMP_LTZ");
dataTypes.put("array","VARCHAR");
dataTypes.put("bool","BOOLEAN");
第三步:将键与Snowflake进行比较:首先,我们查询SnowflakeINFORMATION_SCHEMA表是否存在,如果不存在,我们创建表,如果存在,我们检查文档中字段的更改,并在Snowflak表中添加或修改这些列。DDL脚本是通过使用";数据类型映射";在第二步中,并在第一步中的密钥上迭代
第四步:使用mongoexport命令将数据从MongoDB卸载到本地文件系统:
mongoexport --db <databaseName> --collection <collectionName> --type=csv --fields=<fieldList> --out <filename>
是根据步骤一中的键准备的。
第五步:使用PUT命令,使用Snowsql将.csv文件从本地文件系统暂存到雪花暂存位置。
snowsql -d <database> -s <schema> -o exit_on_error=true -o log_level=DEBUG -q 'put <fileName> @<internalStage> OVERWRITE=TRUE';
步骤六:将数据从暂存位置加载到雪花表
COPY INTO <tableName> FROM @<internalStage>
[file_format=<fileFormat>] [pattern=<regex_pattern>]
在这里,指定file_format和pattern是可选的,我们使用了正则表达式,因为我们在一个雪花阶段中为每个集合暂存多个文件。
步骤七:维护集合列表,该列表可以放在本地文件系统或数据库表中的文件中,在Talend作业中,通过参数化作业中的集合名称、表名称、文件名和暂存名称等,迭代集合列表并通过上述步骤处理每个集合。
一种解决方案是将Mongodb集合的记录加载到variant
类型的Snowflake字段中。然后,创建一个雪花视图,以使用Snowflake的点表示法提取特定关键点。
将数据导出为JSON类型。
mongoexport --type=json --out <filename>
将该导出加载到具有如下结构的表中。
create table collection_name_exports (
data variant, -- This column will contain your export
inserted_at datetime default current_timestamp()
);
根据需要将键提取到视图的列中。
create view collection_name_view as
select
collection_name_exports:key1 as field1,
collection_name_exports:key2 as field2
from collection_name_exports