可以在需要时及时调用Flink Map(未在输入流上激活)



我有一个flink中的地图,一旦数据通过流来激活。

即使没有数据,我也想调用该地图。

我将地图移至函数(无限函数调用(中,但是flink作业永远不会运行。而且,如果我将其添加到地图中,则只有在数据通过时才被激活。

这个想法是,在Infinte循环中具有1个映射,检查一些共享变量和另一个flink流监视Kafka队列,如果数据出现在IT过程中,它会更改一个共享变量,该变量以某种方式影响无限循环。

如何调用无限循环地图并将弗林链图运行?

我尝试创建一个带有随机数据的CollectionMap来激活流和地图以调用无限循环,但是即使MAP中有一段时间(true(条件,也几乎立即退出

在IDE中,当我将其推到flink时。本地,它几乎立即离开循环

流1

    val data_stream = env.addSource(myConsumer)
      .map(x => {process(x)})

流2

    val elements = List[String]("Start")
    var read = env.fromElements(elements).map(x => ProcessData.infinteLoop())

您可以创建一个windowtrigger,并每次x秒调用地图。

您可以找到文档:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html

示例:

import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 val data_stream = env.addSource(myConsumer)
  .map(x => {process(x)})
val window: DataStream[String] = data_stream
  .windowAll(GlobalWindows.create())
  .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
  .apply((w: GlobalWindow, x: Iterable[(Integer, String)], y: Collector[String]) => {})

相关内容

  • 没有找到相关文章

最新更新