我有一个Flink批处理作业,它从kafka读取并写入S3。此工作的当前策略是读取
From:Kafka中提交的偏移量(如果没有提交的偏移,则从最早的偏移量读取(
到:作业开始时的最新偏移量。
所以我基本上有我的卡夫卡消费者如下:
KafkaSource.<T>builder()
.setBootstrapServers(resolvedBootstrapBroker)
.setTopics(List.of("TOPIC_0"))
.setGroupId(consumerGroupId)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(deserializationSchema)
.setBounded(OffsetsInitializer.latest())
.setProperties(additionalProperties)
.build();
我还禁用了检查点,在检查点上提交偏移量,因为检查点在批处理模式下无论如何都是禁用的。我启用了自动提交,如下所示:
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.setProperty("commit.offsets.on.checkpoint", "false");
我遇到的问题是,补偿并没有被卡夫卡所接受。我知道Flink kafka消费者不使用这些补偿。但在我的情况下,由于这是一个批量作业,我需要保证补偿被提交回卡夫卡。目前,我们的记录很少,作业只运行1241毫秒。我甚至尝试过每100毫秒提交一次偏移,就像上面的例子一样。但运气不好。知道我可能做错了什么吗?
FlinkKafkaSource
仅在启用检查点时提交偏移量,如中所述https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#consumer-偏移量提交。由于检查点不能在批处理模式下启用,因此偏移量也不会提交回Kafka。
查看您的源代码,您就是setProperties
,而文档讨论的是setProperty
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#additional-财产——也许这就是问题所在?