Flink:RichMapFunction中的Access Key应用于KeyedStream,用于处理Option键控



当我将RichMapFunction应用于键控流时,我希望将None作为一个关键情况来处理。

例如,我有一个这样的案例类:

case class Foo(a: Option[String], b: Int, acc: Option[Int] = None)

acc是我想用map计算的字段。

我想在流上应用一个有状态映射,所以我有一个RichMapFunction(例如它是一个累加器(:

class Accumulator extends RichMapFunction[Foo, Foo] {
private var sum: ValueState[Int] = _
override def map(input: Foo): Foo = {
val newAcc = Option(sum.value()) match {
case None => input.b
case Some(x) => x + input.b
}
sum.update(newAcc)
Foo(input.a, input.b, Some(newAcc))
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[Int]("accumulator", createTypeInformation[Int])
)
}
}

然后,我的管道被执行:

object ExampleAccumulator extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromElements(Foo(Some("a"), 1, None), Foo(Some("a"), 2, None), Foo(None, 10, None), Foo(None, 6, None))
.keyBy(_.a)
.map(new Accumulator())
.print()
env.execute("ExampleAccumulator")
}

输出为:

Foo(Some(a),1,Some(1))
Foo(Some(a),2,Some(3))
Foo(None,10,Some(10))
Foo(None,6,Some(16))

但当密钥是None时,我想在acc中获得None

是否可以在RichMapFunction中获取密钥?

目前不支持它。该键可以通过KeyContext类中的getCurrentKey((方法获取,该方法在RichMapFunction中未公开。然而,Flink在内部提供了KeyedProcessFunction,它可以在参数Context中返回key。我相信这就是你想要的。

您可以通过Foo的值访问密钥,通过KeySelectorAPI,Scala解决方法:

val selector = scalaKeyedStream
.javaStream 
.asInstanceOf[org.apache.flink.streaming.api.datastream.KeyedStream]
.getKeySelector
scalaKeyedStream.map(in => selector.getKey(in))

您需要将Scala Stream转换为Java,因为Scala API中没有getKeySelector方法,详细信息。

相关内容

  • 没有找到相关文章

最新更新