我已经编写了一个基于云存储触发器的云函数。我有10-15个文件以5秒的间隔降落在云存储桶中,它将数据加载到bigquery表中(截断并加载(。
虽然bucket中有10个文件,但我希望云功能以顺序的方式处理它们,即每次处理1个文件,因为所有文件都访问同一个表进行操作。
目前,云功能一次为多个文件触发,并且由于多个文件试图访问同一个表,它在BIgquery操作中失败。
有什么方法可以在云中配置这个功能吗??
提前感谢!
您可以通过使用pubsub和Cloud Function上的最大实例参数来实现这一点。
- 首先,使用谷歌云存储的通知功能,将事件放入PubSub主题中。
- 现在,每当bucket上发生事件时,您都会收到一条消息。如果只想对文件创建进行筛选(对象最终确定(,则可以对订阅应用筛选器。我写了一篇关于这个的文章
- 然后,创建一个HTTP函数(如果要应用过滤器,则需要HTTP函数(,最大实例设置为1。像这样,同一时间只能执行一个函数。所以,没有并发
- 最后,创建一个关于主题的PubSub订阅,不管是否带有过滤器,以在HTTP中调用函数
编辑
多亏了你的代码,我明白发生了什么。实际上,BigQuery是一个声明性系统。当您执行请求或加载作业时,会创建一个作业并在后台工作。
在python中,你可以明确地等待工作的结束,但是,在panda中,我不知道如何!!
我刚刚找到了一个谷歌云页面来解释如何从panda迁移到BigQuery客户端库。正如你所看到的,的末尾有一条线
# Wait for the load job to complete.
job.result()
而不是等待工作结束。
您在_insert_into_bigquery_dwh
函数中做得很好,但在暂存_insert_into_bigquery_staging
函数中却不是这样。这可能导致2个问题:
- dwh函数处理旧数据,因为触发此作业时暂存尚未完成
- 比方说,如果登台需要10秒,并在";背景";(您不需要在代码中明确等待结束(并且dwh需要1秒钟,下一个文件将在dwh函数结束时处理,即使临时文件继续在后台运行。这就引出了你的问题
您描述的体系结构与您链接的文档中的体系结构不同。请注意,在流程图和代码示例中,存储事件触发云函数,该函数将数据直接流式传输到目标表。由于BigQuery允许多个流式插入作业,因此可以同时执行多个函数而不会出现问题。在您的用例中,用于加载写截断以进行数据清理的中间表会产生很大的差异,因为每次执行都需要前一次执行才能完成,因此需要顺序处理方法。
我想指出的是,PubSub不允许配置消息的发送速率,如果有10条消息到达主题,它们都将发送给订阅者,即使一次处理一条。由于上述原因,将函数限制在一个实例可能会导致开销,并可能增加延迟。也就是说,由于预计的工作量是每天15-30个文件,因此上述问题可能不是什么大问题。
如果您想进行并行执行,可以尝试为每条消息创建一个新表,并使用table.expires(exp_datetime)
setter方法为其设置一个短的截止日期,这样多个执行就不会相互冲突。这是相关的图书馆参考资料。否则纪尧姆的伟大回答将彻底完成任务。