我正在尝试 Flink 并编写了以下示例程序:
object IFJob {
@SerialVersionUID(1L)
final class StringInputFormat extends GenericInputFormat[String] {
val N = 100
var i = 0L
override def reachedEnd(): Boolean = this.synchronized {
i >= N
}
override def nextRecord(ot: String): String = this.synchronized {
i += 1
return (i % 2) + ""
}
}
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val text: DataSet[String] = env.createInput(new StringInputFormat())
val map = text.map {
(_, 1)
}
// map.print()
val by = map.groupBy(0)
val aggregate: AggregateDataSet[(String, Int)] = by.aggregate(Aggregations.SUM, 1)
aggregate.print()
}
}
我正在创建一个StringInputFormat
并并行读取它(默认并行度 = 8)。 当我运行上述程序时,结果在执行之间有所不同,即它们不是确定性的。结果重复 1-8 倍。
例如,我得到以下结果:
// first run
(0,150)
(1,150)
// second run
(0,50)
(1,50)
// third run
(0,200)
(1,200)
预期结果将是
(0,400)
(1,400)
因为在那里StringInputFormat
应该生成 8 乘以 50 条"0"和"1"记录。
我什至将同步添加到输入格式,但没有帮助。
我在 Flink 计算模型中缺少什么?
你观察到的行为是 Flink 如何分配工作给InputFormat
的结果。其工作原理如下:
- 在主节点(作业管理器)上,调用
createInputSplits()
方法,该方法返回一个InputSplit
数组。InputSplit
是要读取(或生成)的数据块。该GenericInputFormat
为每个并行任务实例创建一个InputSplit
。在您的情况下,它会创建 8 个InputSplit
对象,每个对象InputSplit
应生成 50 个"1"
和 50 个"0"
记录。 DataSourceTask
的并行实例在工作线程(任务管理器)上启动。每个DataSourceTask
都有自己的InputFormat
实例。- 启动后,
DataSourceTask
向主服务器请求InputSplit
,并使用InputSplit
调用其InputFormat
的open()
方法。当InputFormat
处理完InputSplit
后,DataSourceTask
向主节点请求一个新的。
在您的情况下,每个InputSplit
都会得到非常快速的处理。因此,DataSourceTasks请求InputSplits为其InputFormats和某些InputFormats处理多个InputSplit
之间存在竞争。由于InputFormat
在打开新InputSplit
时不会重置其内部状态(即设置i = 0
),因此它只会在处理的第一个InputSplit
生成数据。
您可以通过将此方法添加到StringInputFormat
来解决此问题:
override def open(split: GenericInputSplit): Unit = {
super.open(split)
i = 0
}