我通过为Flink不支持的消息队列扩展RichSourceFunction
实现了一个Source。
当我实现签名为的run方法时
override def run(sc: SourceFunction.SourceContext[String]): Unit = {
val msg = read_from_mq
sc.collect(msg)
}
当调用run
方法时,如果消息队列中没有更新的消息,
我应该在不调用
sc.collect
或的情况下运行吗我可以等到更新的数据到来(在这种情况下,
run
方法将被阻止(。
我更喜欢第二种,不确定这是否是正确的用法。
Flink源的run方法应该循环,无休止地产生输出,直到调用其cancel方法。当没有什么可生产的时候,最好能找到一种方法来进行阻塞等待。
apachenifi源连接器是另一个可以用作模型的合理示例。你会注意到,当它无事可做时,它会休眠一段可配置的时间间隔
正如您可能知道的那样,这两个选项在功能上都是正确的,并且会产生正确的结果。
也就是说,第二个是首选,因为你没有抓住线索。事实上,如果您查看RabbitMQ连接器实现,您会注意到它正是这样实现的:在其run
中,它间接地等待消息放置在BlockingQueue
上。