我可以在文档中看到:
flink当前仅提供没有的处理保证 迭代。在迭代作业上启用检查点会导致 例外。为了强制迭代程序的检查点 用户在启用检查点时需要设置一个特殊标志: Env.EnableCheckPointing(Interval,force = true)。
请注意,循环边缘的飞行记录(以及状态 与它们相关的更改)将在失败期间丢失。
这是指批处理作业或迭代流中的迭代,还是两者?
如果它指的是迭代流,则在失败时将可用以下操作员的哪些状态?(例如,从有关使用ConnectedIterativeStreams
跨操作员共享状态的对话中获取,并用.closeWith(stream.broadcast())
终止迭代)。
DataStream<Point> input = ...
ConnectedIterativeStreams<Point, Centroids> inputsAndCentroids = input.iterate().withFeedbackType(Centroids.class)
DataStream<Centroids> updatedCentroids = inputsAndCentroids.flatMap(new MyCoFlatmap())
inputsAndCentroids.closeWith(updatedCentroids.broadcast())
class MyCoFlatmap implements CoFlatMapFunction<Point, Centroid, Centroid>{...}
如果MyCoFlatmap
是CoProcessFunction
而不是CoFlatMapFunction
(意味着它也可以保持状态)?
使用迭代时,限制仅适用于Flink的DataStream
/流api。使用DataSet
/批次API时,没有限制。
使用流迭代时,您实际上不会丢失操作员状态,但是您可能会丢失从操作员通过Loop Edge发送回迭代头部的记录。在您的示例中,如果失败,从updatedCentroids
发送到inputsAndCentroids
的记录可能会丢失。因此,在这种情况下,Flink不能准确保证处理一次。
实际上有一个浮雕改进建议,可以解决这一缺点。但是,它尚未完成。