使用 GenericInputFormat 生成数据时的争用条件



我正在尝试 Flink 并编写了以下示例程序:

object IFJob {
@SerialVersionUID(1L)
final class StringInputFormat extends GenericInputFormat[String] {
val N = 100
var i = 0L
override def reachedEnd(): Boolean = this.synchronized {
i >= N
}
override def nextRecord(ot: String): String = this.synchronized {
i += 1
return (i % 2) + ""
}
}
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val text: DataSet[String] = env.createInput(new StringInputFormat())
val map = text.map {
(_, 1)
}
//        map.print()
val by = map.groupBy(0)
val aggregate: AggregateDataSet[(String, Int)] = by.aggregate(Aggregations.SUM, 1)
aggregate.print()
}
}

我正在创建一个StringInputFormat并并行读取它(默认并行度 = 8)。 当我运行上述程序时,结果在执行之间有所不同,即它们不是确定性的。结果重复 1-8 倍。

例如,我得到以下结果:

// first run 
(0,150) 
(1,150)
// second run 
(0,50) 
(1,50)
// third run 
(0,200) 
(1,200)

预期结果将是

(0,400) 
(1,400)

因为在那里StringInputFormat应该生成 8 乘以 50 条"0"和"1"记录。

我什至将同步添加到输入格式,但没有帮助。

我在 Flink 计算模型中缺少什么?

你观察到的行为是 Flink 如何分配工作给InputFormat的结果。其工作原理如下:

  1. 在主节点(作业管理器)上,调用createInputSplits()方法,该方法返回一个InputSplit数组。InputSplit是要读取(或生成)的数据块。该GenericInputFormat为每个并行任务实例创建一个InputSplit。在您的情况下,它会创建 8 个InputSplit对象,每个对象InputSplit应生成 50 个"1"和 50 个"0"记录。
  2. DataSourceTask的并行实例在工作线程(任务管理器)上启动。每个DataSourceTask都有自己的InputFormat实例。
  3. 启动后,DataSourceTask向主服务器请求InputSplit,并使用InputSplit调用其InputFormatopen()方法。当InputFormat处理完InputSplit后,DataSourceTask向主节点请求一个新的。

在您的情况下,每个InputSplit都会得到非常快速的处理。因此,DataSourceTasks请求InputSplits为其InputFormats和某些InputFormats处理多个InputSplit之间存在竞争。由于InputFormat在打开新InputSplit时不会重置其内部状态(即设置i = 0),因此它只会在处理的第一个InputSplit生成数据。

您可以通过将此方法添加到StringInputFormat来解决此问题:

override def open(split: GenericInputSplit): Unit = {
super.open(split)
i = 0
}

最新更新