将数据从无模式数据库迁移到关系数据库:从MongoDB迁移到Snowflake



我们有一个用例导致我写这篇文章,我相信你们中的许多人都会面临这种情况。当时的情况是通过一个Talend作业将多个集合从MongoDB迁移到Snowflake数据库中,并将集合的顶级节点保留为Snowflak表中的单个字段。

现在,我们知道Talend不支持MongoDB源的动态模式,因为MongoDB集合不强制执行模式,这意味着我们必须为我们想要接收的每个现有/新集合创建单独的作业/子作业。我们还必须重新设计作业,以备将来在文档中进行更改,同时确保它始终有效,因此,我们必须寻找替代方案。

这是一种方法,

第一步:从MongoDB集合中获取所有顶级密钥及其类型。我们使用了与$objectToArray的聚合,将所有顶级键和值对转换为文档数组,然后使用$展开$$group$addToSet,以在整个集合中获得不同的键和值类型。

{
"_id" : "1",
"keys" : [ 
"field1~string", 
"field2~object", 
"filed3~date",
"_id~objectId"
]

}

第二步:在Mongo数据类型和Snowflake数据类型之间创建一对一映射。我们已经创建了一个称为">dataTypes";以存储此信息。或者,该信息可以存储在表格或文件等中。

java.util.Map<String,String> dataTypes = new java.util.HashMap<String,String>();
dataTypes.put("string","VARCHAR");
dataTypes.put("int","NUMBER");
dataTypes.put("objectId","VARCHAR");
dataTypes.put("object","VARIANT");
dataTypes.put("date","TIMESTAMP_LTZ");
dataTypes.put("array","VARCHAR");
dataTypes.put("bool","BOOLEAN");

第三步:将键与Snowflake进行比较:首先,我们查询SnowflakeINFORMATION_SCHEMA表是否存在,如果不存在,我们创建表,如果存在,我们检查文档中字段的更改,并在Snowflak表中添加或修改这些列。DDL脚本是通过使用";数据类型映射";在第二步中,并在第一步中的密钥上迭代

第四步:使用mongoexport命令将数据从MongoDB卸载到本地文件系统:

mongoexport --db <databaseName> --collection <collectionName> --type=csv --fields=<fieldList> --out <filename>

是根据步骤一中的键准备的。

第五步:使用PUT命令,使用Snowsql将.csv文件从本地文件系统暂存到雪花暂存位置。

snowsql -d <database> -s <schema> -o exit_on_error=true -o log_level=DEBUG  -q  'put <fileName> @<internalStage> OVERWRITE=TRUE';

步骤六:将数据从暂存位置加载到雪花表

COPY INTO <tableName> FROM @<internalStage> 
[file_format=<fileFormat>] [pattern=<regex_pattern>]

在这里,指定file_format和pattern是可选的,我们使用了正则表达式,因为我们在一个雪花阶段中为每个集合暂存多个文件。

步骤七:维护集合列表,该列表可以放在本地文件系统或数据库表中的文件中,在Talend作业中,通过参数化作业中的集合名称、表名称、文件名和暂存名称等,迭代集合列表并通过上述步骤处理每个集合。

一种解决方案是将Mongodb集合的记录加载到variant类型的Snowflake字段中。然后,创建一个雪花视图,以使用Snowflake的点表示法提取特定关键点。

将数据导出为JSON类型。

mongoexport --type=json --out <filename>

将该导出加载到具有如下结构的表中。

create table collection_name_exports (
data variant,  -- This column will contain your export
inserted_at datetime default current_timestamp()
);

根据需要将键提取到视图的列中。

create view collection_name_view as
select
collection_name_exports:key1 as field1,
collection_name_exports:key2 as field2
from collection_name_exports

最新更新