有很多关于速率限制/限制Kafka消费者的老话题
- 如何在链接上使用速率限制器?
- 动态节流flink kafka源
- 等。
但是它们都不能在案例1.15中使用:
KafkaFetcher
不暴露emitRecord
- 现在它位于
KafkaRecordEmitter
- 现在它位于
FlinkKafkaConsumer
已弃用KafkaSource
是首选方法
KafkaSource
显式地在createReader
中创建KafkaRecordEmitter
所以,我的问题是,无论如何都有ThrottledIterator
和KafkaSource
的组合吗?
您不应该在主处理线程中应用速率限制,因为这会阻塞检查点。相反,您应该在反序列化模式中应用速率限制。
您可以在反序列化模式的open
方法中设置速率限制器,并在deserialize
方法中获取它。(但我会使用Guava RateLimiter而不是节流迭代器。)