数据流流作业在同一时间多次处理同一元素



简短描述:数据流多次处理同一输入元素,甚至同时并行处理(因此这不是数据流的失败重试内置机制,因为以前的过程没有失败(。

详细描述:管道获取pubsub消息,其中存储GCS文件的路径。在下一步(DoFn类(中,这个文件是逐行打开和读取的,所以有时对于非常大的文件来说,这是一个漫长的过程,(每个文件(需要长达1个小时。

很多时候(非常经常(这些大文件同时处理。我根据日志消息看到,第一个进程已经加载了50万行,另一个进程加载了30万行,第三个进程刚刚启动,所有这些都与同一个文件相关,并且都基于同一个pubsub消息(相同的message_id(。

另外,pubsub队列图表很难看,这些消息没有得到确认,所以未确认的图表不会减少。

知道发生了什么事吗?你经历过类似的事情吗?

我想强调的是,这与失败和重试过程无关。如果第一个进程失败,第二个进程为同一个文件启动,这是正常的,也是意料之中的事。意外的是,如果这两个进程同时存在。

当在Cloud storage上添加文件并向Pub Sub发出自动通知时,可以发送多个通知:

- OBJECT_FINALIZE   Sent when a new object (or a new generation of an existing object) is successfully created in the bucket. This includes copying or rewriting an existing object. A failed upload does not trigger this event.
- OBJECT_METADATA_UPDATE    Sent when the metadata of an existing object changes.
...

pubsub通知文档

您可以访问BeamPubSubMessageattributes,并仅筛选属性为event typeOBJECT_FINALIZE值的消息。

在这种情况下,Dataflow作业将只处理每个文件一条消息,然后DoFn将打开此文件并只处理元素一次。

以下是一种可能的可能性:

  • 正在读取该文件;融合的";从Cloud Pubsub读取消息,以便在将结果保存到Dataflow的内部存储之前进行一个小时的处理,并且可以确认消息
  • 由于您的处理时间太长,Cloud Pubsub将再次传递消息
  • Dataflow不可能取消您的DoFn处理,因此您将看到它们同时处理,即使其中一个已过期,并且在处理完成时将被拒绝

您真正想要的是对大文件读取进行拆分和并行化。Beam可以很容易地做到这一点(目前我认为这是唯一可以做到的框架(。您将文件名传递给TextIO.readFiles()转换,对每个大文件的读取将被拆分并并行执行,并且将有足够的检查点,以便在pubsub消息过期之前对其进行确认。

您可以尝试在PubsubIO.read()和处理之间放置一个Reshuffle

最新更新