我在Spring Boot应用程序中配置了一个kafka监听器,如下所示:
@KafkaListener(topicPartitions = @TopicPartition(topic = 'data.all', partitions = { "0", "1", "2" }), groupId = "kms")
public void listen(ObjectNode message) throws JsonProcessingException {
// Code to convert to json string and write to ElasticSearch
}
该应用程序部署到 3 台服务器上并在这些服务器上运行,尽管它们的组 ID 均为kms
,但它们都会获得消息的副本,这意味着我在 Elastic 中获得了 3 条相同的记录。当我在本地运行实例时,会写入 4 个副本。
我已经确认生产者只向主题写入 1 条消息,方法是检查写入发生之前和之后主题上所有消息的计数;它只增加 1。我该如何防止这种情况?
当您手动分配此类分区时,您负责在实例之间分配分区。
出于分区分布的目的,将忽略该组,但如果需要,仍用于跟踪偏移量。
您必须使用组管理并让 Kafka 为您完成分区分配,或者为每个实例手动分配分区。
代替topicPartitions
,使用topics = "data.all"
不手动分配分区时会发生什么情况
生产者方面
- 当生产者在没有任何策略或指定消息应该转到哪个分区的情况下发送消息时,kafka 会尝试使用循环技术并将所有消息拆分到所有可用分区中。 2 个分区
- 中的消息是唯一的,因为建议最多只 1 个使用者来侦听主题的特定分区。
消费者侧
- 例如,一个主题有 2 个分区
- 然后一个消费者(假设
A
)加入消费者组(假设consumer
) - 每当新的使用者加入并且将 2 个分区分配给
A
时,都会发生分区重新分配,因为我们只有一个使用者组consumer
- 现在,消费者
B
尝试加入相同的消费者组consumer
然后再次进行分区重新分配,并且 A 和 B 都将获得分区来侦听消息
由于我们只有 2 个分区, - 即使我们向同一消费者组添加更多消费者,也只有 2 个消费者会监听发送到主题的消息,因为一次只有 2 个消费者可以获得 1-1 个分区。保持消费者使用的消息的独占性。
在您的情况下,不止 1 个使用者正在侦听相同的分区,因此侦听同一使用者组中相同分区的所有使用者也将收到来自该分区的消息。因此,由于超过 1 个消费者正在侦听相同的分区,因此消费者组中消费者之间的互斥性丢失。