每个人
请帮帮我。
我写了apache-flink-streraming作业,它从apachekafka读取json消息(每秒500-1000条消息(,在POJO中对它们进行反序列化,并执行一些操作(按进程接收器筛选关键字(。我使用了带有ExactlyOnce语义的RocksDB状态后端。但我不明白我需要设置哪个检查点间隔
有些论坛的人写的大多是1000或5000毫秒。我试着设置间隔10ms、100ms、500ms、1000ms、5000ms。我没有注意到任何差异。
有两个因素支持合理的小检查点间隔:
(1( 如果您使用的是执行两阶段事务提交的接收器,如Kafka或StreamingFileSink,那么这些事务将仅在检查点期间提交。因此,作业输出的任何下游消费者都将经历由检查点间隔控制的延迟。
请注意,使用Kafka不会遇到这种延迟,除非您已经采取了端到端只使用一次语义所需的所有步骤。这意味着您必须在Kafka生产者中设置Semantic.EXACTLY_ONCE
,并将下游消费者中的isolation.level
设置为read_committed
。如果要这样做,还应该将transaction.max.timeout.ms
增加到默认值之外(即15分钟(。有关详细信息,请参阅文档。
(2( 如果您的作业失败并且需要从检查点恢复,则输入将被重新缠绕到检查点中记录的偏移量,并且处理将从那里恢复。如果检查点间隔很长(例如,30分钟(,那么您的工作可能需要很长时间才能恢复到再次近乎实时地处理事件的状态(假设您正在处理实时数据(。
另一方面,检查点操作确实会增加一些开销,因此频繁执行检查点操作会对性能产生影响。
除了@David描述的要点之外,我的建议是使用以下功能来配置检查点时间:
StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
通过这种方式,您可以保证您的工作能够取得一些进展,以防状态变得比计划的更大或设置检查点的存储速度较慢。
我建议阅读有关Tuning Checkpoint的Flink文档,以更好地理解这些场景。