Kafka -在应用程序停止时关闭消费者之前的提交偏移量+过去的提交偏移量



我设置了一个spring-kafka消费者。它从主题中消费avro数据,映射值并写入CSV文件。当文件长度为25000条记录或每5分钟一次时,我手动提交偏移量-以先到的为准。

当我们重新启动应用程序时出现问题,因为补丁/版本。

我有一个这样的方法:

@PreDestroy
public void destroy() {
LOGGER.info("shutting down");
writeCsv(true); 
acknowledgment.acknowledge(); // this normally commits the current offset
LOGGER.info("package commited: " + acknowledgment.toString());
LOGGER.info("shutting down completed");
}

所以我在那里添加了一些记录器,日志看起来是这样的:

08:05:47  INFO KafkaMessageListenerContainer$ListenerConsumer - myManualConsumer: Consumer stopped
08:05:47  INFO CsvWriter - shutting down
08:05:47  INFO CsvWriter - created file: FEEDBACK1630476236079.csv
08:05:47  INFO CsvWriter - package commited: Acknowledgment for ConsumerRecord(topic = feedback-topic, partition = 1, leaderEpoch = 17, offset = 544, CreateTime = 1630415419703, serialized key size = -1, serialized value size = 156)
08:05:47  INFO CsvWriter - shutting down completed

由于使用者在调用acknowledge()方法之前停止工作,因此永远不会提交偏移量。日志中没有错误,我们在应用程序再次启动后得到重复。

  1. 是否有一种方法可以在消费者被关闭之前调用方法?

还有一个问题:

我想在消费者上设置一个过滤器,像这样:

if(event.getValue().equals("GOOD") {
addCsvRecord(event) 
} else {
acknowledgement.acknowledge() //to let it read next event

假设我得到了偏移量100 -并且GOOD事件来了,我将它添加到csv文件中,文件等待更多的记录,并且偏移量尚未提交。接下来出现一个BAD事件,它被过滤掉,并立即提交偏移量101。然后文件到达超时时间,即将关闭并调用

acknowlegdment.acknowledge() 

  1. 那里可能会发生什么?以前的偏移量可以被承诺吗?

@PreDestroy在上下文生命周期中太迟了-到那时容器已经停止了。

实现SmartLifecycle,在stop()中做确认。

对于你的第二个问题,只是不要提交坏的偏移;您仍然会得到下一个记录。

Kafka维护两个指针positioncommitted。它们是相关的,但对于正在运行的应用程序来说是独立的。

相关内容

  • 没有找到相关文章

最新更新