在Flink数据集中保存批量迭代的部分输出的可能性



我正在使用flink数据集API进行迭代计算
但每次迭代的结果都是我完整解决方案的一部分
(如果需要更多细节:我在每次迭代中从上到下逐层计算晶格节点,请参阅形式概念分析)

如果我在不保存结果的情况下使用批量迭代的flink数据集API,代码将如下所示:

val start = env.fromElements((0, BitSet.empty))
val end = start.iterateWithTermination(size) { inp =>
    val result = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(inp, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
    (result,result)
}
end.count()

但是,如果我试图在迭代(_.writeAsText())或任何操作中写入部分结果,我将得到错误:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

没有批量迭代的替代方案如下:

var start = env.fromElements((0, BitSet.empty))
var count = 1L
var all = count
while (count > 0){
    start = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(start, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
    count = start.count()
    all = all + count
}
println("total nodes: " + all)

但这种方法在最小的输入数据上异常缓慢,迭代版本需要<30秒,循环版本需要超过3分钟
我想flink无法创建最佳计划来执行循环

有什么变通办法我应该试试?是否可以对flink进行一些修改以将部分结果保存在hadoop等上。?

不幸的是,目前无法从批量迭代中输出中间结果。您只能在迭代结束时输出最终结果。

此外,正如您正确注意到的,Flink无法有效地展开while循环或for循环,因此这也不起作用。

如果中间结果不是那么大,可以尝试将中间结果附加到部分解决方案中,然后在迭代结束时输出所有结果。TransitiveClosureNaive示例中实现了类似的方法,其中在迭代中发现的路径将在下一个部分解决方案中累积。

相关内容

  • 没有找到相关文章

最新更新