有2个数据流,时间戳分配和水印生成器定义如下:
val streamA: DataStream[A] = kafkaStreamASourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[A] {
override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)
val streamA: DataStream[B] = kafkaStreamBSourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[B](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[B] {
override def extractTimestamp(element: B, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)
当这两个流在一个操作符中连接时,则来自streamA或streamB的最小水印作为连接操作符的水印。
class CombineAB extends CoProcessFunction[A, B, C] {
override def processElement1(elem: A, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
override def processElement2(elem: B, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
}
val streamC: DataStream[C] = streamA.connect(streamB)
.process(new CombineAB)
CombineAB
算子的水印为A
或B
的最小值。根据C
型元素是否标注晚,
但是由于我们没有附加任何时间戳分配给C
,这是否意味着CombineAB
操作符中的元素都没有被标记为延迟?因此,C上的窗口不会有任何延迟的记录被删除?
假设我们像下面这样给C附加一个指定的时间戳和水印生成器,那么这是否意味着a和B的水印被完全忽略,CombineAB
的水印只依赖于C的时间戳字段和C定义的延迟?
streamC.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[C](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[C] {
override def extractTimestamp(element: C, recordTimestamp: Long): Long = {
element.updatedTime
}
})
)
是否有一种方法可以将时间戳赋值器附加到C,并且CombineAB
的水印仍然是A
和B
的最小值,并且C的元素根据C的分配的时间戳和CombineAB
的水印标记为后期
更新:CombineAB的精细化实现
几点说明:
forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
很不寻常。任何无序的活动都会迟到。为什么不用forMonotonousTimestamps()
呢?
CombineAB
产生的记录将有时间戳;没有必要对这个流应用assignTimestampsAndWatermarks
。Collector
生成的任何记录的时间戳都是传入记录的时间戳。
如果你在流C上调用assignTimestampsAndWatermarks
,传入的水印将被过滤掉,你需要生成新的水印。