我的目标是有一个 Flink 流程序来保留最后的 N 个 id,其中 id 是从事件中提取的。接收器是一个 Cassandra 存储,以便可以随时获取 ID 列表。重要的是,Cassandra在每次事件时都会立即更新。
这可以通过mapWithState
轻松实现(请参阅下面的代码(。但是,此代码存在重要问题。状态由 userid
键控。某些用户可能会活跃一段时间,然后再也不会活动。我担心的是状态存储将永远增长。
如何清理非活动密钥的状态?
case class MyEvent(userId: Int, id: String)
env
.addSource(new FlinkKafkaConsumer010[MyEvent]("vips", new MyJsonDeserializationSchema(), kafkaConsumerProperties))
.keyBy(_.userId)
.mapWithState[(Int, Seq[String]), Seq[String]] { (in: MyEvent, currentIds: Option[Seq[String]]) =>
val keepNIds = currentIds match {
case None => Seq(in.id)
case Some(cids) => (cids :+ in.id).takeRight(100)
}
((in.userId, keepNIds), Some(keepNIds))
}
.addSink { in: (Int, Seq[String]) =>
CassandraSink.appDatabase.idsTable.store(...)
}
增长状态是一个重要而正确的观察。如果您的密钥空间正在移动,这肯定会发生。
Flink 1.2.0 添加了解决此问题的ProcessFunction
。ProcessFunction
类似于FlatMapFunction
,但可以访问定时服务。您可以注册在过期时调用 onTimer()
回调函数的计时器。回调可用于清理状态。