我有一个Spring Boot应用程序,它使用Spring Kafka侦听Kafka主题。消息被消费后,会使用消息中的信息执行几个web/rest服务调用,以收集一些其他数据,这个过程需要一些时间。因此,我使用了一个大小为20的线程池来创建一个并行消息处理。
该系统通常运行良好,但很少在短时间(1秒(内向Kafka主题发送/生成大量消息(约200K(。在这种情况下,消费者会立即消费消息,但消息处理机制不够快。因此,在等待线程时,所有消耗的消息都会留在内存中,应用程序会得到OutOfMemoryError。
将线程池大小增加到某个程度可能是一种改进,但这并不是解决这个问题的永久方案。我想在一段时间内消耗的消息数量和处理的消息数量之间建立一个平衡。这可以限制从Kafka主题消费的消息数量,或者在有可能立即处理消息时消费消息。
是否有任何Kafka消费者配置来限制一段时间内的消息数量?当消息消耗的延迟不是问题时,我如何优化消耗和处理机制?
PS:似乎没有对两次后续轮询之间的时间间隔进行配置(每次轮询之间的延迟时间是多少(,如果存在,可能会有该配置的解决方案。
这是我的消费者代码:
@Autowired
MessageProcessUtil messageProcessUtil;
private ExecutorService executor = Executors.newFixedThreadPool(20);
@KafkaListener(topics = "${kafka.consumer.topicName}")
public void consume(String message){
logger.info(String.format("$$ -> Consumed Message -> %s",message));
messageProcessUtil.processMessage(message, executor);
}
消费者配置:
kafka.consumer.enable.auto.commit=true
kafka.consumer.auto.commit.interval.ms=1000
kafka.consumer.request.timeout.ms=40000
kafka.consumer.session.timeout.ms=30000
kafka.consumer.max.poll.records=1
kafka.consumer.fetch.max.wait.ms=500
kafka.consumer.auto.offset.reset=earliest
提前感谢您的帮助。
添加对已消耗Kafka记录的异步处理不是一个好主意;它造成了抵消管理方面的问题;在@KafkaListener
上使用concurreny
可以添加更多的使用者(主题上至少需要那么多分区(。
我们有一个非常相似的要求,并使用谷歌的Guava框架来实现速率限制。这个框架有不同的选项,比如限制每个时间段的请求数量,或者允许的请求总数等。有一个很好的例子说明了如何使用它。
Guava限速器教程
由于您将消息消耗和消息处理分开,因此没有配置来实现您想要的。
但是您可以使用BlockingQueue来实现它。您设置了队列的最大数量,让一个拉线程将消息从kafka拉到队列,并让进程线程消耗队列中的消息。当队列变满时,拉线程将阻塞并降低拉速率。