Kafka Connect SourceTask的轮询间隔



我正在使用Kafka-Connect API实现一个自定义源连接器,可用于轮询REST-API并将JSON响应接收到Kafka主题中。

现在我想知道如何实现SourceTask的轮询间隔,JDBC连接器如何提供一个轮询间隔。在某个地方,我必须将线程设置为睡眠状态,但是我必须在哪里执行此操作?

我在SourceTask实现中通过添加类型为 long 的私有字段来存储时间戳来解决此用例。在第一次poll()调用时,该字段尚未初始化,因此将轮询配置的 REST-API。而第一次调用提到的long字段则使用当前时间戳进行初始化。在随后的所有poll()调用中,将检查上一个调用的时间戳。如果自上一次poll()轮询以来经过的毫秒数小于两次轮询之间配置的间隔,则由于配置的毫秒已过去,我将线程送入睡眠状态。

使用 max.poll.interval.ms .

请参考此链接: https://kafka.apache.org/documentation/

相关内容

  • 没有找到相关文章

最新更新