我有一个流媒体过程基本上看起来像这样
Stream(Int, Boolean, Int).Keyby(0, 1).Window().process()
关键点是我想定义一个组合键,然后处理它。但是,如果我使用keyby(0, 1)
和process(... Key: (Int, Boolean), ...)
,进程中的密钥类型总是提示错误。我试图定义keyby(_._1, _._2)
,但不正确。
那么,无论如何使用 scala 定义一个组合键,以便我可以推断出键类型,例如在下面的过程函数中(Int, Boolean)
?
提前感谢!
问题是input.keyBy(0, 1).timeWindow(Time.days(1))
创建了一个KeyedStream[(Int, Boolean, Int), Tuple]
,其中Tuple
是 Flink 的元组类。这也将是process
函数的关键参数的类型。为了访问Tuple
的字段,您需要调用tuple.[T]getField(idx)
,T
是字段的类型。
如果你想让一个 Scala 元组作为ProcessWindowFunction
的键,你需要定义一个KeySelector
。以下代码片段可以解决问题:
input
.keyBy(a => (a._1, a._2))
.timeWindow(Time.days(1))
.process(new ProcessWindowFunction[(Int, Boolean, Int), Int, (Int, Boolean), TimeWindow] {
override def process(key: (Int, Boolean), context: Context, elements: Iterable[(Int, Boolean, Int)], out: Collector[Int]): Unit = {
out.collect(key._1)
}
})