我在Scala中有一个非常简单的Flink应用程序。我有两条简单的小溪。我正在把我的一个流广播到另一个流。广播流包含规则,只是检查另一个流的元组是否在规则之内。一切正常,我的代码如下:
这是一个无限运行的应用程序。我想知道JVM是否有可能收集我的rules
对象作为垃圾。
有人知道吗?提前感谢。
object StreamBroadcasting extends App {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
val stream = env
.socketTextStream("localhost", 9998)
.flatMap(_.toLowerCase.split("\W+").filter(_.nonEmpty))
.keyBy(l => l)
val ruleStream = env
.socketTextStream("localhost", 9999)
.flatMap(_.toLowerCase.split("\W+").filter(_.nonEmpty))
val broadcastStream: DataStream[String] = ruleStream.broadcast
stream.connect(broadcastStream)
.flatMap(new SimpleConnect)
.print
class SimpleConnect extends RichCoFlatMapFunction[String, String, (String, Boolean)] {
private var rules: Set[String] = Set.empty[String] // Can JVM collect this object after a long time?
override def open(parameters: Configuration): Unit = {}
override def flatMap1(value: String, out: Collector[(String, Boolean)]): Unit = {
out.collect(value, rules.contains(value))
}
override def flatMap2(value: String, out: Collector[(String, Boolean)]): Unit = {
rules = rules.+(value)
}
}
env.execute("flink-broadcast-streams")
}
不,规则集不会被垃圾收集。它将永远存在。(当然,由于您没有使用Flink的广播状态,因此这些规则将在应用程序重新启动后失效。)