如何在不同线程中处理@kafkalistener方法



我在春季启动中有kafka处理程序:

    @KafkaListener(topics = "topic-one", groupId = "response")
    public void listen(String response) {
        myService.processResponse(response);
    }

例如,生产者每秒发送一条消息。但是myService.processResponse工作10秒。我需要处理每条消息,然后在新线程中启动myService.processResponse。我可以创建执行者并将每个响应委托给它。但是我认为Kafka中还有另一种配置。我发现2:

1)将concurrency = "5"添加到@KafkaListener注释中 - 它似乎在起作用。但是我不确定有多正确,因为我有第二种方法:

2)我可以创建ConcurrentKafkaListenerContainerFactory并将其设置为ConsumerFactoryconcurrency

我不明白这些方法之间的区别吗?仅将concurrency = "5"添加到@KafkaListener注释还是我需要创建ConcurrentKafkaListenerContainerFactory,这足够了吗?

或我根本不了解任何东西,还有另一种方法吗?

使用遗嘱执行人在管理承诺的偏移方面变得复杂;不建议。

使用@KafkaListener,该框架为您创建ConcurrentKafkaListenerContainerFactory

注释上的

concurrency只是一个方便。它覆盖了工厂设置。

这使您可以与多个侦听器一起使用同一工厂,每个侦听器都有不同的并发。

您可以使用引导属性设置容器并发(默认);该值被注释值所覆盖;请参阅Javadocs ...

/**
 * Override the container factory's {@code concurrency} setting for this listener. May
 * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
 * which case {@link Number#intValue()} is used to obtain the value.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the concurrency.
 * @since 2.2
 */
String concurrency() default "";

concurrency选项与同时处理同一消费者接收的消息无关。当您有多个消费者处理自己的分区时,它是针对消费者群体的。

我相信,将处理传递到单独的线程非常复杂,我相信Spring-Kafka团队决定不这样做"设计"。您甚至不需要挖掘春季卡夫卡即可了解原因。检查KafkaconSumer的检测消费者故障 doc:

必须注意确保承诺的偏移没有 在实际位置之前。通常,您必须自动禁用 仅在 线程已经完成了处理(取决于交货 您需要的语义)。还请注意,您需要暂停 分区,以便直到之后才从民意调查中收到新记录 线程已经完成了以前返回的那些。

我在这里发现的有点旧问题,但我认为我没有看到有关在不同线程上处理消息的实际答案。

如果您不介意不考虑处理的消息,则可以在处理逻辑上利用@Async(并确保您的项目上的项目具有@EnableAsync)。

阅读侦听器中的消息:

  @KafkaListener(topics = "my_topic")
  void listen(ConsumerRecord<String, String> record) {
    messageHandlingService.processMessage(record.value());
  }

然后在单独的服务中,注释您的异步处理消息:

 @Async
  public void processMessage(String messageString) {
    // do stuff here
  }

现在,您可以读取多个消息而不会通过处理这些消息。有了所有事物,都有挑战和警告和可能的配置,但这可能有助于您开始。

相关内容

  • 没有找到相关文章

最新更新