简短描述:数据流多次处理同一输入元素,甚至同时并行处理(因此这不是数据流的失败重试内置机制,因为以前的过程没有失败(。
详细描述:管道获取pubsub消息,其中存储GCS文件的路径。在下一步(DoFn类(中,这个文件是逐行打开和读取的,所以有时对于非常大的文件来说,这是一个漫长的过程,(每个文件(需要长达1个小时。
很多时候(非常经常(这些大文件同时处理。我根据日志消息看到,第一个进程已经加载了50万行,另一个进程加载了30万行,第三个进程刚刚启动,所有这些都与同一个文件相关,并且都基于同一个pubsub消息(相同的message_id(。
另外,pubsub队列图表很难看,这些消息没有得到确认,所以未确认的图表不会减少。
知道发生了什么事吗?你经历过类似的事情吗?
我想强调的是,这与失败和重试过程无关。如果第一个进程失败,第二个进程为同一个文件启动,这是正常的,也是意料之中的事。意外的是,如果这两个进程同时存在。
当在Cloud storage
上添加文件并向Pub Sub
发出自动通知时,可以发送多个通知:
- OBJECT_FINALIZE Sent when a new object (or a new generation of an existing object) is successfully created in the bucket. This includes copying or rewriting an existing object. A failed upload does not trigger this event.
- OBJECT_METADATA_UPDATE Sent when the metadata of an existing object changes.
...
pubsub通知文档
您可以访问Beam
中PubSubMessage
的attributes
,并仅筛选属性为event type
和OBJECT_FINALIZE
值的消息。
在这种情况下,Dataflow
作业将只处理每个文件一条消息,然后DoFn
将打开此文件并只处理元素一次。
以下是一种可能的可能性:
- 正在读取该文件;融合的";从Cloud Pubsub读取消息,以便在将结果保存到Dataflow的内部存储之前进行一个小时的处理,并且可以确认消息
- 由于您的处理时间太长,Cloud Pubsub将再次传递消息
- Dataflow不可能取消您的DoFn处理,因此您将看到它们同时处理,即使其中一个已过期,并且在处理完成时将被拒绝
您真正想要的是对大文件读取进行拆分和并行化。Beam可以很容易地做到这一点(目前我认为这是唯一可以做到的框架(。您将文件名传递给TextIO.readFiles()
转换,对每个大文件的读取将被拆分并并行执行,并且将有足够的检查点,以便在pubsub消息过期之前对其进行确认。
您可以尝试在PubsubIO.read()
和处理之间放置一个Reshuffle