我正在使用 spring-kafka 来消费来自两个 Kafka 主题的消息,它发送的消息格式如下。
@KafkaListener(topics = {"topic_country1", "topic_country2"}, groupId = KafkaUtils.MESSAGE_GROUP)
public void onCustomerMessage(String message, Acknowledgment ack) throws Exception {
log.info("Message : {} is received", message);
ack.acknowledge();
}
- KafkaListener 能否根据自己监听的主题数和两个主题中的并行进程消息来分配使用者线程数?或者它不支持并行处理,消息必须在主题中等待,直到处理一条消息?
- 如果主题中的消息数更高,我需要自动扩展我的微服务以启动新实例(直到分区数(。从 KafkaListener 的角度来看,我可以依靠哪些参数(CPU、内存(来找出主题中的消息数量更高?(即在 API 中,我可以通过监控 HTTP 延迟来自动扩展服务(
可以将concurrency
属性设置为运行更多线程;但每个分区只能由一个线程处理。要提高并发性,您必须增加每个主题中的分区数。在同一侦听器中侦听多个主题时,如果这些主题只有一个分区,则除非更改 kafka 使用者分区分配器,否则可能无法获得所需的并发性。
请参阅 https://docs.spring.io/spring-kafka/docs/2.5.0.RELEASE/reference/html/#using-ConcurrentMessageListenerContainer
侦听多个主题时,默认分区分布可能不是您所期望的。例如,如果您有三个主题,每个主题有五个分区,并且您希望使用 concurrency = 15,则只会看到五个活动使用者,每个使用者从每个主题分配一个分区,其他 10 个使用者处于空闲状态。这是因为默认的 Kafka PartitionAssignor 是 RangeAssignor(参见其 Javadoc(。对于此方案,您可能需要考虑改用轮循环分配器,它将分区分布到所有使用者。然后,为每个使用者分配一个主题或分区。...
如果要在分区计数之外进行水平扩展并动态扩展 - 请考虑使用类似并行使用者 (PC( 的内容。它可以在 Spring 上下文中使用。
通过使用 PC,您可以并行处理所有密钥,无论处理需要多长时间,并且您可以根据需要并发 - 这可以动态扩展。
PC 直接解决了这个问题,通过按键对输入分区进行子分区并并行处理每个键。 它还跟踪每个记录确认。在GitHub上查看Parallel Consumer(它是开源的BTW,我是作者(。