不同模式的Google数据流管道



我有一个产品来定义和配置业务工作流。该产品的一部分是表单生成器,使用户可以设置不同的表单。

整个表单数据以以下结构备份在MongoDB上

- form_schemas
{
"_id" : "",
"name" : "",
"account_id" : "",
"fields" : [
{
"label" : "Enter Email",
"name" : "email",
"type" : "text",
"required" : "true",
"hidden" : "false",
"additional_config" : { }
},
{
"label" : "Select DOB",
"name" : "dob",
"type" : "date",
"required" : "true",
"hidden" : "false",
"additional_config" : { }
}
...
]
}
- form_datas
{
"workflow_template_id" : "",
"account_id" : ""
"data" : {
"email" : "xyx@gmail.com",
"dob" : "2001-04-05"
},
"created_at" : "",
"updated_at" : ""
}

如上所述,该表单可以用于各种不同的业务。然而,我正在寻找数据管道,以定期将数据传输到Google Bigquery进行分析。

在BQ方面,我为每个工作流程维护单独的表格

我有一个目前的工作解决方案,它完全写在谷歌云功能上。我有一个谷歌调度程序作业以周期性的间隔运行,调用不同的云功能。云功能在高级中执行以下操作

  • 对每个模式进行迭代
  • 读取自上次运行以来每个模式的数据mongodb(作为游标(
  • 对于每一行数据,运行自定义转换逻辑(这包括转换各种嵌套的数据类型,如网格/查找等(
  • 在谷歌云存储上将每一行转换后的数据直接作为流写入ndjson

上述解决方案为我提供了

  • 完全控制转换
  • 简单的部署

然而,由于这一切都在CF上,我受到每次跑步9分钟的限制。这本质上提出了很多分页要求,尤其是在需要从开始重新生成完整数据的情况下

虽然上述解决方案目前运行良好,但我正在考虑其他无服务器选项,如谷歌数据流。由于我刚开始研究数据流/Apachebeam,我想知道

若我要在光束上写一个流水线,我应该采用的相同方法吗

  1. 提取(逐行(->转换->负荷(GCS(->负载(BQ(

  1. 提取(整个数据为JSON(->加载到GCS->变换(波束(->加载到GCS->加载到BQ

请告诉我是否有更好的选项用于整个数据处理。

通常,这类过程将原始数据写入GCS,然后转换为Bigquery。这样做是为了当您发现转换中的缺陷(这是不可避免的(和需求更改(也是不可避免的(时,您可以用新代码重放数据。

理想情况下,转换之前的步骤通过变更数据捕获(CDC(工具实现自动化。疾病控制与预防中心有很多工具,但Debezium正在取代它,因为它是可靠和免费的。有一个Debezium连接器可以从MongoDB获取数据,还有如何将Debezium CDC放入Bigquery的示例。

如果您要编写将数据放入GCS的代码,我建议您考虑使用ApacheParquet而不是NDJSON作为格式。性能和成本会更好,而且我发现使用数据类型的格式更容易。

最新更新