问题:Flink 应用程序未接收和处理来自 Kinesis 连接器的事件,该连接器在关闭时生成(由于重新启动(
我们有以下 Flink 环境设置
env.enableCheckpointing(1000ms);
env.setStateBackend(new RocksDBStateBackend("file:///<filelocation>", true));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(pause);
env.getCheckpointConfig().setCheckpointTimeout(timeOut);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(concurrency);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
和 Kinesis 具有以下初始配置
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST");
当我更改 Kinesis 配置以回复事件时
,令人不安的是,即kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON");
Flink 正在从 Kinesis 接收所有缓冲记录(这包括在事件 Flink 应用程序关闭之前、期间和之后生成的事件(并对其进行处理。因此,此行为违反了 Flink 应用程序的"恰好一次"属性。
有人可以指出我遗漏的一些明显的东西吗?
Flink Kinesis 连接器确实将分片序列号存储在该状态中,以便恰好一次处理。
从您的描述来看,似乎在您的作业"重新启动"中,不遵守检查点状态。
只是首先消除显而易见的: 您的作业如何从重新启动中恢复? 您是从保存点恢复,还是从以前的检查点自动完成此重新启动?
如果您想使用检查点来跟踪流中消费者的流行音乐,那么前面的答案是一个不错的选择。
这是一个具有更多控制权的替代方案。您可以尝试使用 AT_TIMESTAMP 作为 Flink Kinesis 连接器中的STREAM_INITIAL_POSITION配置选项。
此设置需要一个配置选项STREAM_INITIAL_TIMESTAMP,这是您需要从 Kinesis 读取消息的时间戳。
时间戳值可以通过多种方式维护 - 用于更新文本文件的接收器、用于在外部数据库(如 DynamoDB(中更新的接收器(由启动脚本手动提供(等。
当 Flink 应用程序重新启动时,提供上次处理的时间戳作为运行时参数,并在 Kinesis 使用者的配置中使用它。
您的配置将如下所示:
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
Double startTimeStamp = 1459799926.480; //Parameterize this!
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, startTimeStamp);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
"kinesis_stream_name", new SimpleStringSchema(), consumerConfig));