我指的是带有以下代码的Kafka源连接器的Flink 1.14版本。
我期待以下要求。
- 在应用程序的最新开始时,必须读取Kafka主题的最新偏移量
- 在检查点上,它必须将消耗的偏移量提交给Kafka
- 重新启动后(当应用程序手动终止/系统错误时(,它必须从上一次提交的偏移中进行选择,并且必须消耗使用者滞后和此后的新事件馈送
使用Flink新的KafkaConsumer API(KafkaSource(,我面临以下问题
- 能够满足上述要求,但无法提交检查点上消耗的偏移量(500ms(。它宁愿在2秒或3秒后提交
当您在该2s/3s内手动终止应用程序并重新启动时。由于最后一条使用的消息未提交,因此会读取两次(重复(。
为了交叉检查这个功能,我尝试了FlinkKafka的老消费者API(FlinkKavkaConsumer(。它在那里工作得很好。当消息被立即消费时,它被提交回Kafka。
遵循的步骤
- 设置Kafka环境
- 运行下面的flink代码进行消费。代码包括旧的和新的API。这两个API都将使用Kafka主题并在控制台上打印
- 推送一些关于卡夫卡主题的消息
- 在推送一些消息后,在控制台中看到这些消息后,终止Flink作业
- 检查两个API的kafka使用者组。新的flink消费者api的组id(test1(消费者滞后>0,而不是旧的使用者api的组id(older_test1(
- 当您重新启动Flink作业时,您可以看到这些未提交的消息在控制台中从新的Flink kafka-consumer API中可见,从而导致重复的消息
如果我缺少什么或需要添加任何属性,请提出建议。
@Test
public void test() throws Exception {
System.out.println("FlinkKafkaStreamsTest started ..");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.enableCheckpointing(500);
env.setParallelism(4);
Properties propertiesOld = new Properties();
Properties properties = new Properties();
String inputTopic = "input_topic";
String bootStrapServers = "localhost:29092";
String groupId_older = "older_test1";
String groupId = "test1";
propertiesOld.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
propertiesOld.put(ConsumerConfig.GROUP_ID_CONFIG, groupId_older);
propertiesOld.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
/******************** Old Kafka API **************/
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(inputTopic,
new KRecordDes(),
propertiesOld);
flinkKafkaConsumer.setStartFromGroupOffsets();
env.addSource(flinkKafkaConsumer).print("old-api");
/******************** New Kafka API **************/
KafkaSourceBuilder<String> sourceBuilder = KafkaSource.<String>builder()
.setBootstrapServers(bootStrapServers)
.setTopics(inputTopic)
.setGroupId(groupId)
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("enable.auto.commit", "false")
.setProperty("commit.offsets.on.checkpoint", "true")
.setProperties(properties)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST));
KafkaSource<String> kafkaSource = sourceBuilder.build();
SingleOutputStreamOperator<String> source = env
.fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
source.print("new-api");
env.execute();
}
static class KRecordDes implements KafkaDeserializationSchema<String>{
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
return new String(consumerRecord.value());
}
}
注意:我还有其他要求,我希望Flink Kafka绑定源代码读取器在同一代码中,它在新的API(KafkaSource(中可用。
来自Kafka的文档来源:
注意,Kafka源不依赖于容错。提交偏移量仅用于公开进度消费者和消费群体的监控。
当Flink作业从失败中恢复时,它将从最新成功的检查点恢复状态,而不是在broker上使用已提交的偏移量,并从该检查点中存储的偏移量恢复消耗,因此检查点之后的记录将为";重放的";一点点由于您使用的是打印接收器,它不支持一次语义,因此您将看到重复的记录,这些记录实际上是最近一次成功检查点之后的记录。
对于您提到的偏移提交延迟2-3秒,这是因为SourceReaderBase
的实现。简而言之,SplitFetcher
管理一个任务队列,当一个偏移提交任务被推入队列时,它不会被执行,直到调用KafkaConsumer#poll()
的正在运行的获取任务超时。如果交通量很小,延误时间可能会更长。但请注意,这不会影响正确性:KafkaSource不使用提交的偏移量进行容错。