想象一个案例,输入是一个文件名,我们想使用flinkRichFlatMapFunction
来更新文件的状态和输出行(每个文件包含10k行)。我想知道我应该在哪里更新状态,以确保一次交货。这里有两个解决方案:
// solution 1
class MyOp extends RichFlatMapFunction {
...
def flatMap(filename: String, out: Collector[String]): Unit = {
val state = Option(flinkState.value()).getOrElse(defaultState)
for (line <- read(filename)) {
state.update(line)
flinkState.update(state)
out.collect(line)
}
}
}
// solution 2
class MyOp extends RichFlatMapFunction {
...
def flatMap(filename: String, out: Collector[String]): Unit = {
val state = Option(flinkState.value()).getOrElse(defaultState)
for (line <- read(filename)) {
flinkState.update(state)
out.collect(line)
}
state.update(line)
}
}
就正确性而言,它没有任何区别。在调用用户函数(如RichFlatMapFunction
)的过程中从未发生检查点,因此检查点将反映处理传递给flatMap
方法的事件之前或之后的状态。
就性能而言,解决方案2要好得多。