Camel kafka with Spring FileStateRepository



我试图将 Kafka 偏移量保存到我使用的文件中 Spring 启动,似乎偏移量正在写入文件但没有读取,因此 Camel 将在重新启动时从 kafka 主题的开头开始读取

@Component
public class Route extends RouteBuilder {
@Override
public void configure() throws Exception {
from(kafka())
.to("log:TEST?level=INFO")
.process(Route::commitKafka);
}
private String kafka() {
String kafkaEndpoint = "kafka:";
kafkaEndpoint += "topic";
kafkaEndpoint += "?brokers=";
kafkaEndpoint += "localhost:9092";
kafkaEndpoint += "&groupId=";
kafkaEndpoint += "TEST";
kafkaEndpoint += "&autoOffsetReset=";
kafkaEndpoint += "earliest";
kafkaEndpoint += "&autoCommitEnable=";
kafkaEndpoint += false;
kafkaEndpoint += "&allowManualCommit=";
kafkaEndpoint += true;
kafkaEndpoint += "&offsetRepository=";
kafkaEndpoint += "#fileStore";
return kafkaEndpoint;
}
@Bean(name = "fileStore")
private FileStateRepository fileStateRepository() {
FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
// This will be empty
// System.out.println(fileStateRepository.getCache());
return fileStateRepository;
}
private static void commitKafka(Exchange exchange) {
KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();
}
}

我终于找到了一个解决方案,但它没有出现在文档中 必须调用 start 方法才能在启动时初始化存储库

@Bean(name = "fileStore")
private FileStateRepository fileStateRepository() {
FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
try {
fileStateRepository.start();
} catch (Exception e) {
e.printStackTrace();
}

return fileStateRepository;
}

相关内容

  • 没有找到相关文章

最新更新