Spring Boot Kafka侦听器不一致



我正在尝试让几个不同的Spring Cloud微服务都连接到一个Kafka/Zookeeper集群,所有这些都在Kubernetes中。微服务使用org.springframework.kafka:spring-kafka-作为事件的消费者和生产者。

所有的服务都连接到kafka好的-并且创建了主题;然而,每项服务的消费者都非常不一致。

例如,当服务启动一次时,所有使用者都将侦听消息,并调用函数。但是,当我重新启动一切(包括卡夫卡和动物园管理员(时,它要么不起作用,要么不同服务中的一些消费者会起作用等等。

以下是我的一些配置-我没有任何基于Java的配置-只是在我的应用程序中。yml如下:

spring:
....
kafka:
consumer:
bootstrap-servers: api-kafka.default.svc.cluster.local:9092
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: api-event
enable-auto-commit: false
producer:
bootstrap-servers: api-kafka.default.svc.cluster.local:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
ack-mode: manual
...

我的主要课程:

@EnableCaching
@SpringBootApplication
@EnableJpaRepositories
@EnableDiscoveryClient
@EnableKafka /* <<<<<<<------------- ENABLED HERE */
public class ExampleServiceApplication {
public static void main(String[] args) {
SpringApplication.run(ExampleServiceApplication.class, args);
}
.....
}

最后,我的消费者:

@Component
public class MessageListener {
@KafkaListener(
topics = "myTopic")
public void eventListener(String serializedMessage) {
try {
....

这些消息被发送到代理很好,但没有被其他服务使用。

我意识到没有映射到每个服务属性上的主题,我如何通过application.yml做到这一点?

我打赌我犯了一个很小的错误,但是的!我真的很感谢任何意见或帮助

Btw,您可以在这里阅读更多关于分区数量和并行消费者(具有相同组id的消费者(数量之间关系的信息。

https://docs.confluent.io/platform/current/streams/architecture.html

稍微简化一下,应用程序运行的最大并行度受流任务的最大数量限制,流任务本身由应用程序正在读取的输入主题的最大分区数决定。例如,如果您的输入主题有5个分区,那么您最多可以运行5个应用程序实例。这些实例将协同处理主题的数据。如果您运行的应用程序实例数量大于输入主题的分区数量,则"多余"的应用程序示例将启动,但仍处于空闲状态;但是,如果其中一个繁忙实例发生故障,其中一个空闲实例将恢复前者的工作。我们在常见问题解答中提供了更详细的解释和示例。

相关内容

  • 没有找到相关文章

最新更新