是否有一种方法,只读取X号消息在特定的时间段假设1分钟使用Akka kafka流消费者https://doc.akka.io/docs/alpakka-kafka/0.15/consumer.html通过一些配置。需要处理在特定时间有来自生产者的大量消息的情况,从而可能影响到消费者。
Akka Streams中的throttle
阶段可以用来限制元素在流中传递到下一阶段的速率(使用Scala API):
.throttle(100, 1.minute)
这仍然会处理弹幕中的每条消息,但如果两次爆发之间有足够的时间,处理可以赶上。
请注意,Akka Streams有一个强大的背压机制:下游阶段只需要足够的消息,他们可以处理。这实际上使处理过程自我节流:如果这还不够,或者你已经"选择退出",节流阶段是有用的;反压力。
如果爆发之间没有足够的时间来处理赶上,可以使用full - drop-message-if-full策略将buffer
阶段放在throttle
阶段之前。这将防止节流的反压力传播到Kafka消费者,例如:
.buffer(1000, OverflowStrategy.dropHead) // Drop the oldest message
.throttle(100, 1.minute)