我有三个消费者,他们都有多个实例,这些实例都在消费同一个主题。我希望每个消费者消费一次这个话题。为此,我创建了一个消费者组。从我读到的内容来看,Kafka 应该足够聪明,可以选择一个服务实例来使用主题,但这并没有发生,所有消费者的所有实例都在消费该主题
我认为这可能与三个具有相同组名称的消费者有关,所以我关闭了两个消费者,留下一个消费者有两个实例,但是当消费者组应该只选择一个实例插入数据库时,我仍然看到两条记录进入数据库。
我做错了什么或错过了下面的东西吗?
应用.yml
spring:
cloud:
stream:
bindings:
input-data:
destination: publisheddata.t
group: publisheddata
kafka:
bindings:
input:
consumer:
autoCommitOffset: false
binder:
auto-create-topics: true
kafka:
mode: raw
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka:9092
zk-nodes: kafka:2181
频道.java
public interface Channels {
String INPUT_DATA = "input-data";
@Input(INPUT_DATA)
SubscribableChannel dataInput();
}
数据处理程序.java
@EnableBinding(Channels.class)
@Configuration
public class DataMessageHandler {
@StreamListener(Channels.INPUT_DATA)
public void handle(Message<?> message) {
... handling message ...
}
如果将来有人遇到此问题,在研究了几天后,我发现问题出在我正在使用的 Spring Cloud 版本上,我使用的是Brixton.RELEASE
,一旦我将其更新为Dalston.SR2
这解决了我的问题