我在不同的帐户中拥有带有Flink应用程序的多级KDA。我有一个用例,我需要查看记录内容,以确定将数据推送到哪个AWS帐户(该帐户中的驱动蛋白流(。
链接显示了它可以根据记录内容选择流名称,我需要支持多个kinesis制作人来推送不同的AWS帐户。
有什么帮助吗?
作为一种替代方案,您可以使用Side Outputs为不同的AWS帐户配置专用接收器(因此,FlinkKinesisProducer
(。
你可以按如下方式制作:
val stream: DataStream[T] = ...
val account1OutputTag = OutputTag[T]("aws-account-1-output")
...
val accountNOutputTag = OutputTag[T]("aws-account-N-output")
val mainDataStream = stream
.process(new ProcessFunction[T, T] {
override def processElement(
value: T,
ctx: ProcessFunction[T, T]#Context,
out: Collector[T]): Unit = {
// emit data to regular output
out.collect(value)
// emit data to a corresponding side output
ctx.output(accountKOutputTag, value)
}
})
...
val account1SideOutputStream: DataStream[T] = mainDataStream
.getSideOutput(account1OutputTag)
.addSink(account1KinesisProducer)
...
val accountNSideOutputStream: DataStream[T] = mainDataStream
.getSideOutput(accountNOutputTag)
.addSink(accountNKinesisProducer)