我在春季启动中有kafka处理程序:
@KafkaListener(topics = "topic-one", groupId = "response")
public void listen(String response) {
myService.processResponse(response);
}
例如,生产者每秒发送一条消息。但是myService.processResponse
工作10秒。我需要处理每条消息,然后在新线程中启动myService.processResponse
。我可以创建执行者并将每个响应委托给它。但是我认为Kafka中还有另一种配置。我发现2:
1)将concurrency = "5"
添加到@KafkaListener
注释中 - 它似乎在起作用。但是我不确定有多正确,因为我有第二种方法:
2)我可以创建ConcurrentKafkaListenerContainerFactory
并将其设置为ConsumerFactory
和concurrency
我不明白这些方法之间的区别吗?仅将concurrency = "5"
添加到@KafkaListener
注释还是我需要创建ConcurrentKafkaListenerContainerFactory
,这足够了吗?
或我根本不了解任何东西,还有另一种方法吗?
使用遗嘱执行人在管理承诺的偏移方面变得复杂;不建议。
使用@KafkaListener
,该框架为您创建ConcurrentKafkaListenerContainerFactory
。
concurrency
只是一个方便。它覆盖了工厂设置。
这使您可以与多个侦听器一起使用同一工厂,每个侦听器都有不同的并发。
您可以使用引导属性设置容器并发(默认);该值被注释值所覆盖;请参阅Javadocs ...
/**
* Override the container factory's {@code concurrency} setting for this listener. May
* be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
* which case {@link Number#intValue()} is used to obtain the value.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the concurrency.
* @since 2.2
*/
String concurrency() default "";
concurrency
选项与同时处理同一消费者接收的消息无关。当您有多个消费者处理自己的分区时,它是针对消费者群体的。
我相信,将处理传递到单独的线程非常复杂,我相信Spring-Kafka团队决定不这样做"设计"。您甚至不需要挖掘春季卡夫卡即可了解原因。检查KafkaconSumer的检测消费者故障 doc:
必须注意确保承诺的偏移没有 在实际位置之前。通常,您必须自动禁用 仅在 线程已经完成了处理(取决于交货 您需要的语义)。还请注意,您需要暂停 分区,以便直到之后才从民意调查中收到新记录 线程已经完成了以前返回的那些。
我在这里发现的有点旧问题,但我认为我没有看到有关在不同线程上处理消息的实际答案。
如果您不介意不考虑处理的消息,则可以在处理逻辑上利用@Async
(并确保您的项目上的项目具有@EnableAsync
)。
阅读侦听器中的消息:
@KafkaListener(topics = "my_topic")
void listen(ConsumerRecord<String, String> record) {
messageHandlingService.processMessage(record.value());
}
然后在单独的服务中,注释您的异步处理消息:
@Async
public void processMessage(String messageString) {
// do stuff here
}
现在,您可以读取多个消息而不会通过处理这些消息。有了所有事物,都有挑战和警告和可能的配置,但这可能有助于您开始。