我正在使用flink数据集API进行迭代计算
但每次迭代的结果都是我完整解决方案的一部分
(如果需要更多细节:我在每次迭代中从上到下逐层计算晶格节点,请参阅形式概念分析)
如果我在不保存结果的情况下使用批量迭代的flink数据集API,代码将如下所示:
val start = env.fromElements((0, BitSet.empty))
val end = start.iterateWithTermination(size) { inp =>
val result = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(inp, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
(result,result)
}
end.count()
但是,如果我试图在迭代(_.writeAsText())或任何操作中写入部分结果,我将得到错误:
org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?
没有批量迭代的替代方案如下:
var start = env.fromElements((0, BitSet.empty))
var count = 1L
var all = count
while (count > 0){
start = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(start, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
count = start.count()
all = all + count
}
println("total nodes: " + all)
但这种方法在最小的输入数据上异常缓慢,迭代版本需要<30秒,循环版本需要超过3分钟
我想flink无法创建最佳计划来执行循环
有什么变通办法我应该试试?是否可以对flink进行一些修改以将部分结果保存在hadoop等上。?
不幸的是,目前无法从批量迭代中输出中间结果。您只能在迭代结束时输出最终结果。
此外,正如您正确注意到的,Flink无法有效地展开while循环或for循环,因此这也不起作用。
如果中间结果不是那么大,可以尝试将中间结果附加到部分解决方案中,然后在迭代结束时输出所有结果。TransitiveClosureNaive示例中实现了类似的方法,其中在迭代中发现的路径将在下一个部分解决方案中累积。