如何处理 Flink 的表 API 窗口中的后期元素?



在我们的流应用程序中,使用 Flink 1.55 及其表 API,我需要检测和处理后期元素。我无法找到 DataStream API 功能的替代方案 .sideOutputLateData(...(

我尝试在 Flink 文档中搜索 https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/tableApi.html 和谷歌搜索很多,但没有发现任何有用的东西

例:

table
  .window(Tumble over windowLengthInMinutes.minutes on 'timeStamp as 'timeWindow)
  .groupBy(..fieds list)
  .select(..fields)

提供的代码按预期工作。问题是,延迟到达的元素 - 由窗口大小和允许的延迟定义,将被丢弃。有没有办法通过表 API 本机处理这些后期元素?

从 Flink 1.8.0 开始,Table API 目前似乎并不直接支持此功能。解决此问题的一种方法是将表转换为DataStream[Row]并在此上设置侧输出:

val outputTag = OutputTag[String]("side-output")
val flink = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(flink)
// Make sure the source emits data to the selected side output
tableEnv.registerTableSource(...)
val table = tableEnv.sqlQuery("QUERY")
// Can also be toAppendStream, depending on the underlying table output
val dataStream = tableEnv.toRetractStream(table)
val sideOutputStream = dataStream.getSideOutput(outputTag)

下面是拆分的代码示例:

  val mainSource = env.addSource(someSource)
  val splitted = mainSource.split(
      (x:DataKpi)=> isLate(x.getTimestamp) match {
        case false =>List("regular")
        case true =>List("late")
      }
    )
  val regularSource= splitted select "regular"
  val lateSource=    splitted select "late"

regularSourcelateSource 是稍后单独处理的新流。在使用这种方法之前,我们遇到了一些口是心非的情况。isLate()函数是一个自定义函数,用于确定元素是否可能延迟。在我的情况下,此功能使用BoundedOutOfOrdernessTimestampExtractor.getCurrentWatermark.getTimestamp提供的当前水印信息。

我找到了一个解决方案。我目前正在使用BoundedOutOfOrdernessTimestampExtractor,它提供水印时间戳信息。我使用此信息拆分输入流并单独处理后期流。

相关内容

  • 没有找到相关文章

最新更新