与Google DataFlow中的流媒体,无界的PCollection一起工作,该流程源自Cloud PubSub订阅。我们将其用作消防人,将事件连续地传递给Bigtable。交付的一切都表现良好。
我们的问题是,我们有下游的批处理作业,期望在交付后读取一天的数据价值。我想利用窗口和触发来实现副作用,当水印超过一天的阈值时,该副作用将把标记行写入Bigtable,这表明数据流有理由相信大多数事件已经交付(我们没有''t需要有力保证完整性,只是合理的),并且下游处理可以开始。
我们尝试的是将原始事件写出管道中的一个水槽,然后使用窗格中的定时信息窗口进入另一个水槽,以确定水印是否已升级。这种方法的问题在于它再次在原始事件本身上进行操作,这是不可取的,因为它会重复编写事件行。我们可以防止此写入,但是管道中的并行路径仍将在事件的窗流流中运行。
是否有一种有效的方法可以将索回调附加到水印上,以便我们可以在水印前进时执行单个动作?
在事件时间设置计时器并接收回调的一般能力绝对是一个重要的功能请求,以beam-27为主动开发。
但实际上,您将窗口插入FixedWindows.of(Duration.standardDays(1))
的方法似乎只会使用DataFlow Java SDK 1.x的功能来实现您的目标。您可以通过添加触发器AfterPane.elementCountAtLeast(1)
来维护"消防"行为,而不是划分管道。它确实产生了GroupByKey
的成本,但没有复制任何内容。
完整的管道可能看起来像这样:
pipeline
// Read your data from Cloud Pubsub and parse to MyValue
.apply(PubsubIO.Read.topic(...).withCoder(MyValueCoder.of())
// You'll need some keys
.apply(WithKeys.<MyKey, MyValue>of(...))
// Window into daily windows, but still output as fast as possible
.apply(Window.into(FixedWindows.of(Duration.standardDays(1)))
.triggering(AfterPane.elementCountAtLeast(1)))
// GroupByKey adds the necessary EARLY / ON_TIME / LATE labeling
.apply(GroupByKey.<MyKey, MyValue>create())
// Convert KV<MyKey, Iterable<MyValue>>
// to KV<ByteString, Iterable<Mutation>>
// where the iterable of mutations has the "end of day" marker if
// it was ON_TIME
.apply(MapElements.via(new MessageToMutationWithEndOfWindow())
// Write it!
.apply(BigTableIO.Write.to(...);
如果我错过了您的用例的一些细节,请对我的答案发表评论。