哪种方式使用 flink 的广播状态更好



使用flink version 1.13.1

代码已被简化。

我在我的项目中使用广播状态,它将每5分钟发送一些配置,因为一个进程函数只连接一个广播源,所以我定义了一个case类,以便传输三种配置

case类:

case class OrderConfBroadcastBean(orderRuleConfig: List[OrderInfoBean],
userSegmentInfo: Map[String, (String, String)],
lacciRegRel: Map[String, Set[String]])

和广播状态码:

val orderConfBroadcast = env.addSource(new OrderConfSource(dbConfig, serverConfig.smsRuleRedis))
.name("order_conf_load")
.uid("order_conf_load")
.setParallelism(1)
.broadcast(new MapStateDescriptor[String, OrderConfBroadcastBean]("order_conf_broadcast", createTypeInformation[String], createTypeInformation[OrderConfBroadcastBean]))

我想知道在进程函数中使用广播状态的两种方式,哪一种是正确的,或者哪一种具有更好的性能和更低的内存使用,以及为什么

第一个用法:

class OrderFilterProcess(var userSegmentInfo: Map[String, (String, String)],
var orderInfo: List[OrderInfoBean],
redisConf: String,
var lacciRegRel: Map[String, Set[String]]) extends KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean] {
override def processElement(regLacci: RegLacciBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#ReadOnlyContext, out: Collector[OrderResultBean]): Unit = {
userSegmentInfo.get("xxx")
orderInfo.map(xxx)
}
override def processBroadcastElement(value: OrderConfBroadcastBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#Context, out: Collector[OrderResultBean]): Unit = {
if (value.orderRuleConfig.nonEmpty) {
orderInfo = value.orderRuleConfig
}
if (value.userSegmentInfo.nonEmpty) {
userSegmentInfo = value.userSegmentInfo
}
if (value.lacciRegRel.nonEmpty) {
lacciRegRel = value.lacciRegRel
}
}
}

第二种方式:

class OrderFilterProcess(var userSegmentInfo: Map[String, (String, String)],
var orderInfo: List[OrderInfoBean],
redisConf: String,
var lacciRegRel: Map[String, Set[String]]) extends KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean] {
val stateDescriptor = new MapStateDescriptor[String, OrderConfBroadcastBean]("order_conf_broadcast", createTypeInformation[String], createTypeInformation[OrderConfBroadcastBean])
override def processElement(regLacci: RegLacciBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#ReadOnlyContext, out: Collector[OrderResultBean]): Unit = {
val state = ctx.getBroadcastState(ruleStateDescriptor)
Option(state.get("order_state")).map(_.get("xxx")).orElse(userSegmentInfo.get("xxx"))
}
override def processBroadcastElement(value: OrderConfBroadcastBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#Context, out: Collector[OrderResultBean]): Unit = {
ctx.getBroadcastState(stateDescriptor).put("order_state", value);
}
}

这两个实现之间的最大区别在于,在第一个实现中,您将从广播流接收到的数据存储到变量中,这些变量将在作业失败时丢失,而在第二个实现中,您使用广播状态,该状态将被检查点并恢复。

版本2有一些开销。你必须测量它来找出有多少——但在这两种情况下,数据都将在内存中,所以差异应该不会很大。

最新更新