我正在使用Kafka-Connect API实现一个自定义源连接器,可用于轮询REST-API并将JSON响应接收到Kafka主题中。
现在我想知道如何实现SourceTask的轮询间隔,JDBC连接器如何提供一个轮询间隔。在某个地方,我必须将线程设置为睡眠状态,但是我必须在哪里执行此操作?
我在SourceTask
实现中通过添加类型为 long
的私有字段来存储时间戳来解决此用例。在第一次poll()
调用时,该字段尚未初始化,因此将轮询配置的 REST-API。而第一次调用提到的long
字段则使用当前时间戳进行初始化。在随后的所有poll()
调用中,将检查上一个调用的时间戳。如果自上一次poll()
轮询以来经过的毫秒数小于两次轮询之间配置的间隔,则由于配置的毫秒已过去,我将线程送入睡眠状态。
使用 max.poll.interval.ms
.
请参考此链接: https://kafka.apache.org/documentation/