在Spring Kafka中启动多个Kafka监听器



我们的一个开发团队正在做一些我以前从未见过的事情。

首先,他们为消费者定义了一个抽象类。

public abstract class KafkaConsumerListener {
protected void processMessage(String xmlString) {

}
}

然后,他们使用10个类,如下面的类,创建10个个人消费者。

@Component
public class <YouNameIt>Consumer extends KafkaConsumerListener {
private static final String <YouNameIt> = "<YouNameIt>";
@KafkaListener(topics = "${my-configuration.topicname}",
groupId = "${my-configuration.topicname.group-id}",
containerFactory = <YouNameIt>)
public void listenToStuff(@Payload String message) {
processMessage(message);
}   
}

因此,他们试图启动10个Kafka监听器(每个监听器一个类/对象(。每个监听器都应该有自己的使用者组(有自己的名称(,并从一个(但不同的(主题进行消费。

他们似乎使用了不同的ConcurrentKafkaListenerContainerFactories,每个都有@Bean注释,因此他们可以为每个容器工厂分配不同的groupId

Spring Kafka支持这样的东西吗?

它似乎一直有效到几天前,现在似乎有一个消费者群体一直处于困境。它开始,读取很少的记录,然后挂起,消费者滞后越来越大

有什么想法吗?

是的,它是受支持的,但不必仅仅为了更改组id而创建多个工厂-注释上的groupId属性会覆盖工厂属性。

像你描述的那样的问题很可能是消费者的话题;"卡住";在某个地方的用户代码中;进行线程转储以查看线程在做什么。

最新更新