在apache-flink中,我们应该按每次收集还是按每次输入更新状态



想象一个案例,输入是一个文件名,我们想使用flinkRichFlatMapFunction来更新文件的状态和输出行(每个文件包含10k行)。我想知道我应该在哪里更新状态,以确保一次交货。这里有两个解决方案:

// solution 1
class MyOp extends RichFlatMapFunction {
...
def flatMap(filename: String, out: Collector[String]): Unit = {
val state = Option(flinkState.value()).getOrElse(defaultState)
for (line <- read(filename)) {
state.update(line)
flinkState.update(state)
out.collect(line)
}
}
}
// solution 2
class MyOp extends RichFlatMapFunction {
...
def flatMap(filename: String, out: Collector[String]): Unit = {
val state = Option(flinkState.value()).getOrElse(defaultState)
for (line <- read(filename)) {
flinkState.update(state)
out.collect(line)
}
state.update(line)
}
}

就正确性而言,它没有任何区别。在调用用户函数(如RichFlatMapFunction)的过程中从未发生检查点,因此检查点将反映处理传递给flatMap方法的事件之前或之后的状态。

就性能而言,解决方案2要好得多。

相关内容

  • 没有找到相关文章

最新更新