如何为单个flink作业配置设置两个不同的流程功能



我有一个带有3个不同输入(可选(的flink作业,并且每种类型的输入都会发出相同的输出。

input1使用KeyedProcessFunction()input2使用ProcessWindowFuction()

基本上,作业输入是三个输入和一个输出的联合。我们如何配置flink作业,以便对于单个作业,我可以使用以上两个流程函数。

我想用KeyedProcessFunction()代替input1,用ProcessWindowFuction()代替input2

以下代码仅适用于input2,如果我想使用input1,我必须在作业配置中使用.process(processFuction())而不是.process(MyProcessWindowFunction()),我们如何配置才能在单个作业中使用这两个功能?

fun setupJob(env: StreamExecutionEnvironment) {
val testStream = env.sampleStream()
.keyBy { it.f0 }
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.process(MyProcessWindowFunction())

testStream.map { it.toKafkaMessage() }
.kafkaSink<SampleOutput>() }
}

单个Flink作业可以包含多个管道。例如,

env.fromSource(input1)
.keyBy(...)
.process(new MyKeyedProcessFunction())
.sinkTo(sink1)
env.fromSource(input2)
.keyBy(...)
.window(...)
.process(new MyProcessWindowFunction())
.sinkTo(sink2)
env.execute()

最新更新