什么是SourceFunction#run应该在Flink中工作



我通过为Flink不支持的消息队列扩展RichSourceFunction实现了一个Source。

当我实现签名为的run方法时

override def run(sc: SourceFunction.SourceContext[String]): Unit = {
val msg = read_from_mq
sc.collect(msg)
}

当调用run方法时,如果消息队列中没有更新的消息,

  1. 我应该在不调用sc.collect或的情况下运行吗

  2. 我可以等到更新的数据到来(在这种情况下,run方法将被阻止(。

我更喜欢第二种,不确定这是否是正确的用法。

Flink源的run方法应该循环,无休止地产生输出,直到调用其cancel方法。当没有什么可生产的时候,最好能找到一种方法来进行阻塞等待。

apachenifi源连接器是另一个可以用作模型的合理示例。你会注意到,当它无事可做时,它会休眠一段可配置的时间间隔

正如您可能知道的那样,这两个选项在功能上都是正确的,并且会产生正确的结果。

也就是说,第二个是首选,因为你没有抓住线索。事实上,如果您查看RabbitMQ连接器实现,您会注意到它正是这样实现的:在其run中,它间接地等待消息放置在BlockingQueue上。

相关内容

  • 没有找到相关文章