目前,我正在使用DataFlow进行流式处理,将上传的blobs从GCS移动到BigQuery。然而,我发现有几个pub/sub消息指向同一个objectId,这在BigQuery端造成了重复。
我知道GCS通知是保证的至少一次交付,因此对于给定的blob文件,我可能会有重复的pub/sub消息。
我读了Python Beam文档,它可以利用id_label作为唯一的记录标识符。为了基于blob文件名进行重复数据删除,我尝试插入属性。objectId上的id_label参数。然而,它似乎不工作(我仍然收到重复的消息):
| "Read from pub/sub topic" >> ReadFromPubSub( topic=config["prod"]["input_pubsub"], with_attributes=True, id_label="attributes.objectId")
注意:我使用的是Beam Python SDK 2.38.0
这是因为Dataflow端的重复数据删除是尽力而为的。
Dataflow对10分钟内发布到Pub/Sub的消息进行重复数据删除。
看到https://cloud.google.com/dataflow/docs/concepts/streaming-with-cloud-pubsub efficient_deduplication .