无法在Flink新的Kafka消费者api(1.14)中的检查点上将消费偏移提交给Kafka



我指的是带有以下代码的Kafka源连接器的Flink 1.14版本。

我期待以下要求。

  • 在应用程序的最新开始时,必须读取Kafka主题的最新偏移量
  • 在检查点上,它必须将消耗的偏移量提交给Kafka
  • 重新启动后(当应用程序手动终止/系统错误时(,它必须从上一次提交的偏移中进行选择,并且必须消耗使用者滞后和此后的新事件馈送

使用Flink新的KafkaConsumer API(KafkaSource(,我面临以下问题

  • 能够满足上述要求,但无法提交检查点上消耗的偏移量(500ms(。它宁愿在2秒或3秒后提交

当您在该2s/3s内手动终止应用程序并重新启动时。由于最后一条使用的消息未提交,因此会读取两次(重复(。

为了交叉检查这个功能,我尝试了FlinkKafka的老消费者API(FlinkKavkaConsumer(。它在那里工作得很好。当消息被立即消费时,它被提交回Kafka。

遵循的步骤

  • 设置Kafka环境
  • 运行下面的flink代码进行消费。代码包括旧的和新的API。这两个API都将使用Kafka主题并在控制台上打印
  • 推送一些关于卡夫卡主题的消息
  • 在推送一些消息后,在控制台中看到这些消息后,终止Flink作业
  • 检查两个API的kafka使用者组。新的flink消费者api的组id(test1(消费者滞后>0,而不是旧的使用者api的组id(older_test1(
  • 当您重新启动Flink作业时,您可以看到这些未提交的消息在控制台中从新的Flink kafka-consumer API中可见,从而导致重复的消息

如果我缺少什么或需要添加任何属性,请提出建议。

@Test
public void test() throws Exception {
System.out.println("FlinkKafkaStreamsTest started ..");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.enableCheckpointing(500);
env.setParallelism(4);
Properties propertiesOld = new Properties();
Properties properties = new Properties();
String inputTopic = "input_topic";
String bootStrapServers = "localhost:29092";
String groupId_older = "older_test1";
String groupId = "test1";
propertiesOld.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
propertiesOld.put(ConsumerConfig.GROUP_ID_CONFIG, groupId_older);
propertiesOld.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

/******************** Old Kafka API **************/
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(inputTopic,
new KRecordDes(),
propertiesOld);
flinkKafkaConsumer.setStartFromGroupOffsets();
env.addSource(flinkKafkaConsumer).print("old-api");

/******************** New Kafka API **************/
KafkaSourceBuilder<String> sourceBuilder = KafkaSource.<String>builder()
.setBootstrapServers(bootStrapServers)
.setTopics(inputTopic)
.setGroupId(groupId)
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("enable.auto.commit", "false")
.setProperty("commit.offsets.on.checkpoint", "true")
.setProperties(properties)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST));
KafkaSource<String> kafkaSource = sourceBuilder.build();
SingleOutputStreamOperator<String> source = env
.fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
source.print("new-api");
env.execute();
}
static class KRecordDes implements  KafkaDeserializationSchema<String>{
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
return new String(consumerRecord.value());
}
}

注意:我还有其他要求,我希望Flink Kafka绑定源代码读取器在同一代码中,它在新的API(KafkaSource(中可用。

来自Kafka的文档来源:

注意,Kafka源不依赖于容错。提交偏移量仅用于公开进度消费者和消费群体的监控。

当Flink作业从失败中恢复时,它将从最新成功的检查点恢复状态,而不是在broker上使用已提交的偏移量,并从该检查点中存储的偏移量恢复消耗,因此检查点之后的记录将为";重放的";一点点由于您使用的是打印接收器,它不支持一次语义,因此您将看到重复的记录,这些记录实际上是最近一次成功检查点之后的记录。

对于您提到的偏移提交延迟2-3秒,这是因为SourceReaderBase的实现。简而言之,SplitFetcher管理一个任务队列,当一个偏移提交任务被推入队列时,它不会被执行,直到调用KafkaConsumer#poll()的正在运行的获取任务超时。如果交通量很小,延误时间可能会更长。但请注意,这不会影响正确性:KafkaSource不使用提交的偏移量进行容错。

相关内容

  • 没有找到相关文章

最新更新