我设置了一个spring-kafka消费者。它从主题中消费avro数据,映射值并写入CSV文件。当文件长度为25000条记录或每5分钟一次时,我手动提交偏移量-以先到的为准。
当我们重新启动应用程序时出现问题,因为补丁/版本。
我有一个这样的方法:
@PreDestroy
public void destroy() {
LOGGER.info("shutting down");
writeCsv(true);
acknowledgment.acknowledge(); // this normally commits the current offset
LOGGER.info("package commited: " + acknowledgment.toString());
LOGGER.info("shutting down completed");
}
所以我在那里添加了一些记录器,日志看起来是这样的:
08:05:47 INFO KafkaMessageListenerContainer$ListenerConsumer - myManualConsumer: Consumer stopped
08:05:47 INFO CsvWriter - shutting down
08:05:47 INFO CsvWriter - created file: FEEDBACK1630476236079.csv
08:05:47 INFO CsvWriter - package commited: Acknowledgment for ConsumerRecord(topic = feedback-topic, partition = 1, leaderEpoch = 17, offset = 544, CreateTime = 1630415419703, serialized key size = -1, serialized value size = 156)
08:05:47 INFO CsvWriter - shutting down completed
由于使用者在调用acknowledge()方法之前停止工作,因此永远不会提交偏移量。日志中没有错误,我们在应用程序再次启动后得到重复。
- 是否有一种方法可以在消费者被关闭之前调用方法?
还有一个问题:
我想在消费者上设置一个过滤器,像这样:
if(event.getValue().equals("GOOD") {
addCsvRecord(event)
} else {
acknowledgement.acknowledge() //to let it read next event
假设我得到了偏移量100 -并且GOOD事件来了,我将它添加到csv文件中,文件等待更多的记录,并且偏移量尚未提交。接下来出现一个BAD事件,它被过滤掉,并立即提交偏移量101。然后文件到达超时时间,即将关闭并调用
acknowlegdment.acknowledge()
- 那里可能会发生什么?以前的偏移量可以被承诺吗?
@PreDestroy
在上下文生命周期中太迟了-到那时容器已经停止了。
实现SmartLifecycle
,在stop()
中做确认。
对于你的第二个问题,只是不要提交坏的偏移;您仍然会得到下一个记录。
Kafka维护两个指针position
和committed
。它们是相关的,但对于正在运行的应用程序来说是独立的。