如何在 Flink of Scala 中定义 KeySelector?



我有一个流媒体过程基本上看起来像这样

Stream(Int, Boolean, Int).Keyby(0, 1).Window().process()

关键点是我想定义一个组合键,然后处理它。但是,如果我使用keyby(0, 1)process(... Key: (Int, Boolean), ...),进程中的密钥类型总是提示错误。我试图定义keyby(_._1, _._2),但不正确。

那么,无论如何使用 scala 定义一个组合键,以便我可以推断出键类型,例如在下面的过程函数中(Int, Boolean)

提前感谢!

问题是input.keyBy(0, 1).timeWindow(Time.days(1))创建了一个KeyedStream[(Int, Boolean, Int), Tuple],其中Tuple是 Flink 的元组类。这也将是process函数的关键参数的类型。为了访问Tuple的字段,您需要调用tuple.[T]getField(idx)T是字段的类型。

如果你想让一个 Scala 元组作为ProcessWindowFunction的键,你需要定义一个KeySelector。以下代码片段可以解决问题:

input
.keyBy(a => (a._1, a._2))
.timeWindow(Time.days(1))
.process(new ProcessWindowFunction[(Int, Boolean, Int), Int, (Int, Boolean), TimeWindow] {
override def process(key: (Int, Boolean), context: Context, elements: Iterable[(Int, Boolean, Int)], out: Collector[Int]): Unit = {
out.collect(key._1)
}
})

相关内容

  • 没有找到相关文章

最新更新