Apache Flink水印行为的TwoInputStreamOperator操作符



有2个数据流,时间戳分配和水印生成器定义如下:

val streamA: DataStream[A] = kafkaStreamASourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[A] {
override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)
val streamA: DataStream[B] = kafkaStreamBSourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[B](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[B] {
override def extractTimestamp(element: B, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)

当这两个流在一个操作符中连接时,则来自streamA或streamB的最小水印作为连接操作符的水印。

class CombineAB extends CoProcessFunction[A, B, C] {
override def processElement1(elem: A, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
override def processElement2(elem: B, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
}
val streamC: DataStream[C] = streamA.connect(streamB)
.process(new CombineAB)

CombineAB算子的水印为AB的最小值。根据C型元素是否标注晚,

但是由于我们没有附加任何时间戳分配给C,这是否意味着CombineAB操作符中的元素都没有被标记为延迟?因此,C上的窗口不会有任何延迟的记录被删除?

假设我们像下面这样给C附加一个指定的时间戳和水印生成器,那么这是否意味着a和B的水印被完全忽略,CombineAB的水印只依赖于C的时间戳字段和C定义的延迟?

streamC.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[C](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[C] {
override def extractTimestamp(element: C, recordTimestamp: Long): Long = {
element.updatedTime
}
})
)

是否有一种方法可以将时间戳赋值器附加到C,并且CombineAB的水印仍然是AB的最小值,并且C的元素根据C的分配的时间戳和CombineAB的水印标记为后期


更新:CombineAB的精细化实现

几点说明:

forBoundedOutOfOrderness[A](Duration.ofSeconds(0))很不寻常。任何无序的活动都会迟到。为什么不用forMonotonousTimestamps()呢?

CombineAB产生的记录将有时间戳;没有必要对这个流应用assignTimestampsAndWatermarksCollector生成的任何记录的时间戳都是传入记录的时间戳。

如果你在流C上调用assignTimestampsAndWatermarks,传入的水印将被过滤掉,你需要生成新的水印。

相关内容

  • 没有找到相关文章

最新更新