我有一个flink中的地图,一旦数据通过流来激活。
即使没有数据,我也想调用该地图。
我将地图移至函数(无限函数调用(中,但是flink作业永远不会运行。而且,如果我将其添加到地图中,则只有在数据通过时才被激活。
这个想法是,在Infinte循环中具有1个映射,检查一些共享变量和另一个flink流监视Kafka队列,如果数据出现在IT过程中,它会更改一个共享变量,该变量以某种方式影响无限循环。
如何调用无限循环地图并将弗林链图运行?
我尝试创建一个带有随机数据的CollectionMap来激活流和地图以调用无限循环,但是即使MAP中有一段时间(true(条件,也几乎立即退出
在IDE中,当我将其推到flink时。本地,它几乎立即离开循环
流1
val data_stream = env.addSource(myConsumer)
.map(x => {process(x)})
流2
val elements = List[String]("Start")
var read = env.fromElements(elements).map(x => ProcessData.infinteLoop())
在您可以创建一个window
和trigger
,并每次x
秒调用地图。
您可以找到文档:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
示例:
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
val data_stream = env.addSource(myConsumer)
.map(x => {process(x)})
val window: DataStream[String] = data_stream
.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
.apply((w: GlobalWindow, x: Iterable[(Integer, String)], y: Collector[String]) => {})