在Spring Boot中配置Kafka消费者=失去对消费者行为的控制



我真的很喜欢使用Spring Boot中所有可用的配置来配置Kafka消费者:

ConsumerFactory() => KafkaListenerContainerFactory() => consume(V message)

然而,我似乎也失去了对apachekafka包附带的消费者行为的所有控制,比如在同步和异步提交之间来回切换,明确启动消费者并干净地关闭它。有了Springkafka接口,您只需要实现一个方法,就可以开始获得流入的消息:

@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
public void consumeString(String message) {
System.out.println("Consumed message: " + message);
}

在我看来,所有这些消耗都发生在Spring Boot自动提供的一个单独的线程上。。。有人能告诉我,在使用Spring Boot配置KafkaConsumers(以及同样的KafkaProducers(时,我如何仍然保持所有这些控制吗?谢谢

在我看来,所有这些消耗都发生在Spring Boot自动提供的单独线程上

没错,spring将kafka消费者包装在侦听器容器中。

您最多可以通过容器属性来配置行为。

理论上,你可以保留卡夫卡消费者工厂返回的最新值,但你需要记住,对于一个容器,可以多次创建一个消费者。无论如何,通过保留引用,您可以在没有反射的情况下访问底层kafka消费者。但这是一个黑客攻击,可能会影响容器的行为。

最新更新