在我们的流应用程序中,使用 Flink 1.55 及其表 API,我需要检测和处理后期元素。我无法找到 DataStream API 功能的替代方案 .sideOutputLateData(...(
我尝试在 Flink 文档中搜索 https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/tableApi.html 和谷歌搜索很多,但没有发现任何有用的东西
例:
table
.window(Tumble over windowLengthInMinutes.minutes on 'timeStamp as 'timeWindow)
.groupBy(..fieds list)
.select(..fields)
提供的代码按预期工作。问题是,延迟到达的元素 - 由窗口大小和允许的延迟定义,将被丢弃。有没有办法通过表 API 本机处理这些后期元素?
从 Flink 1.8.0 开始,Table API 目前似乎并不直接支持此功能。解决此问题的一种方法是将表转换为DataStream[Row]
并在此上设置侧输出:
val outputTag = OutputTag[String]("side-output")
val flink = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(flink)
// Make sure the source emits data to the selected side output
tableEnv.registerTableSource(...)
val table = tableEnv.sqlQuery("QUERY")
// Can also be toAppendStream, depending on the underlying table output
val dataStream = tableEnv.toRetractStream(table)
val sideOutputStream = dataStream.getSideOutput(outputTag)
下面是拆分的代码示例:
val mainSource = env.addSource(someSource)
val splitted = mainSource.split(
(x:DataKpi)=> isLate(x.getTimestamp) match {
case false =>List("regular")
case true =>List("late")
}
)
val regularSource= splitted select "regular"
val lateSource= splitted select "late"
regularSource
和 lateSource
是稍后单独处理的新流。在使用这种方法之前,我们遇到了一些口是心非的情况。isLate()
函数是一个自定义函数,用于确定元素是否可能延迟。在我的情况下,此功能使用BoundedOutOfOrdernessTimestampExtractor.getCurrentWatermark.getTimestamp
提供的当前水印信息。
我找到了一个解决方案。我目前正在使用BoundedOutOfOrdernessTimestampExtractor,它提供水印时间戳信息。我使用此信息拆分输入流并单独处理后期流。