我们在Scala中使用Flink来路由和转换分析管道中的Protobuf事件(使用scalabb(。我有一个"PlayStreams"的数据流,其模式如下:
message PlayStream {
optional PlayerEvent player_event = 1;
optional BlockAccountIPEvent block_account_ip_event = 2;
}
由此生成的事例类具有类型为签名Option[PlayerEvent]
的playerEvent
成员。
我想将数据流转换为PlayerEvents,过滤掉任何没有它们的数据流。我是Scala的新手,所以我不知道如何习惯地做到这一点。我目前工作良好:
// in main()
getDataStream(name, env, config.get("KafkaSource"))
.keyBy[String](PlayStreamFunctions.key(_))
.map{ _.getPlayerEvent }
.filter(filterDefaultPlayerEvents(_))
def filterDefaultPlayerEvents(playerEvent: PlayerEvent): Boolean = {
playerEvent match {
case PlayerEvent.defaultInstance => false
case _ => true
}
}
这是因为生成的类中的getPlayerEvent
只是playerEvent.getOrElse(PlayerEvent.defaultInstance)
,并且我们不将默认实例用于任何事情。然而,创建一堆对defaultInstance的引用,却在下一步中立即过滤掉,这感觉很奇怪。有没有办法避免这种我看不到的情况?
我想澄清一下,我在Flink下确定了这个问题的范围,因为所有的映射函数都是Flink特定的实现。我意识到flatMap
是可用的,并且考虑到映射操作比Options的模式匹配更惯用,我采用了这个实现:
getDataStream(name, env, config.get("KafkaSource"))
.keyBy[String](PlayStreamFunctions.key(_))
.flatMap{ _.playerEvent.toList }
.flatMap(toFlatPlayerEvent(_))
由于如果Option不存在,toList
将返回一个空列表,如果存在,则返回一个带值的一元列表,因此它们之间的平面映射解决了我的问题。