在Scala中过滤Flink数据流到可选的子对象



我们在Scala中使用Flink来路由和转换分析管道中的Protobuf事件(使用scalabb(。我有一个"PlayStreams"的数据流,其模式如下:

message PlayStream {
optional PlayerEvent player_event = 1;
optional BlockAccountIPEvent block_account_ip_event = 2;
}

由此生成的事例类具有类型为签名Option[PlayerEvent]playerEvent成员。

我想将数据流转换为PlayerEvents,过滤掉任何没有它们的数据流。我是Scala的新手,所以我不知道如何习惯地做到这一点。我目前工作良好:

// in main()
getDataStream(name, env, config.get("KafkaSource"))
.keyBy[String](PlayStreamFunctions.key(_))
.map{ _.getPlayerEvent }
.filter(filterDefaultPlayerEvents(_))

def filterDefaultPlayerEvents(playerEvent: PlayerEvent): Boolean = {
playerEvent match {
case PlayerEvent.defaultInstance => false
case _ => true
}
}

这是因为生成的类中的getPlayerEvent只是playerEvent.getOrElse(PlayerEvent.defaultInstance),并且我们不将默认实例用于任何事情。然而,创建一堆对defaultInstance的引用,却在下一步中立即过滤掉,这感觉很奇怪。有没有办法避免这种我看不到的情况?

我想澄清一下,我在Flink下确定了这个问题的范围,因为所有的映射函数都是Flink特定的实现。我意识到flatMap是可用的,并且考虑到映射操作比Options的模式匹配更惯用,我采用了这个实现:

getDataStream(name, env, config.get("KafkaSource"))
.keyBy[String](PlayStreamFunctions.key(_))
.flatMap{ _.playerEvent.toList }
.flatMap(toFlatPlayerEvent(_))

由于如果Option不存在,toList将返回一个空列表,如果存在,则返回一个带值的一元列表,因此它们之间的平面映射解决了我的问题。

最新更新