Flink KeyedCoProcessFunction正在处理状态



我使用KeyedCoProcessFunction函数用来自另一个流的数据来丰富主数据流

代码:

class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {
case class AssetStateDoc(assetId: Option[String])
private var associatedDevices: ValueState[AssetStateDoc] = _
override def open(parameters: Configuration): Unit = {
val associatedDevicesDescriptor =
new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
}
override def processElement1(
packet: PacketData,
ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
out: Collector[AssetData]): Unit = {

val tmpState = associatedDevices.value
val state = if (tmpState == null) AssetStateDoc(None) else tmpState

state.assetId match {
case Some(assetId) =>
logger.debug(s"There are state for ${packet.tag.externalId} = $assetId")
out.collect(AssetData(assetId, packet.tag.externalId.get, packet.toString))
case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
case _ => logger.debug("Smth went wrong")
}
}
override def processElement2(
value: AssetCommandState,
ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
out: Collector[AssetData]): Unit = {

value.command match {
case CREATE =>
logger.debug(s"Got command to CREATE state for tag: ${value.id} with value: ${value.assetId}")
logger.debug(s"current state is ${associatedDevices.value()}")
associatedDevices.update(AssetStateDoc(Some(value.assetId)))
logger.debug(s"new state is ${associatedDevices.value()}")
case _ =>
logger.error("Got unknown AssetCommandState command")
}
}
}

processElement2()运行良好,它可以接受数据并更新状态
但在processElement1()中,我总是命中case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")

尽管我预计processElement2函数中会设置一个值

作为一个例子,我使用了这个指南-https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/

processElement1processElement2确实共享状态,但请记住,这是关键分区状态。这意味着,当处理给定值v2时,在processElement2中设置的值只有在稍后调用时才会在processElement1中看到,其中值v1具有与v2相同的密钥。

还要记住,您无法控制进入processElement1processElement2的两个流之间的竞争条件。

官方Apache Flink培训中的RidesAndFares练习都是关于学习使用API的这一部分。https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/etl/是相应教程的主页。

相关内容

  • 没有找到相关文章

最新更新