我们在GCP中使用pubsub和cloud函数来协调我们的数据工作流程。
我们的工作流程类似于:
工作流_gcp
pubsub1和pubsub3可以在不同的时间(例如凌晨1点和4点(被触发。它们每天都会从外部来源(我们的ETL,Talend(触发。
我们的云函数基本上在BigQuery中执行SQL。
这运行得很好,但我们必须手动创建一个编排数据库来记录函数的开始和结束时间(以回答"函数X执行得好吗?"的问题(。编排逻辑与我们的业务逻辑紧密耦合,因为我们的云函数必须知道之前必须执行哪些函数,之后必须触发哪些pubsub。
因此,我们正在寻找一种将编排逻辑和业务逻辑分离的解决方案。
我发现composer(气流(可能是一个解决方案,但是:
-
它不能以本机方式运行云功能(使用API非常有限,每个项目每100秒调用16次(
-
我们可以在气流中使用BigQuery和BigQuery操作符,但编排和业务逻辑将再次强烈耦合
那么,在我们的案例中,最佳实践是什么?
感谢您的帮助
您可以使用Cloud Composer(Airflow(,并且仍然可以重用大部分现有设置。
首先,您可以保留所有现有的云功能,并使用HTTP触发器(或您喜欢的其他触发器(在Airflow中触发它们。您需要做的唯一更改是在气流中实现PubSub传感器,以便它触发您的云功能(从而确保您可以控制流程的端到端编排(。
您的解决方案将是一个Airflow DAG,它根据PubSub消息触发云功能,如果功能成功,则向Airflow报告,然后,如果两者都成功,则使用HTTP触发器或类似触发器触发第三个云功能。
最后一个注释,它不是立即直观的。Airflow并不意味着运行作业本身,它意味着编排和管理依赖关系。事实上,您使用由气流触发的云功能并不是一种反模式,实际上是一种最佳实践。
在您的情况下,您可以100%重写一些内容并使用BigQuery运算符,因为您不进行任何处理,只触发查询/作业,但这一概念是正确的,最佳实践是利用Airflow确保事情在您需要的时间和顺序发生,而不是自己处理这些事情。(希望这有意义(
作为气流的替代方案,我会考虑"argo工作流"->https://github.com/argoproj/argo
它没有composer所拥有的成本开销,尤其是对于较小的工作负载。
我会:
创建了一个从外部工具读取pubsub消息的部署,并将其部署到kubernetes。
根据消息执行工作流。工作流中的每一步都可以是一个云功能,打包在docker中。
(我会用kubernetes作业取代云功能,然后由工作流触发。(
用docker封装一个云函数并在kuberentes中运行它是非常直接的。
gsutil/bq/gcloud中存在预构建的docker映像,因此您可以创建bash脚本,该脚本使用"bq"命令行来执行bigquery中的内容。