如何在Apache Flink中收集富接收器函数的端输出



我正在Flink 中尝试以下场景

  1. Flink使用kafka主题中的数据并根据avro模式进行验证
  2. 在对数据进行一些富集后,在process函数中将数据转换为JSON有效载荷
  3. 数据丰富后应写入Postgres数据库,并通过Flink RichSinkFunction将数据上传到Azure blob存储

我在Sink函数中被困在一个位置,该过程应该以事务方式发生,这意味着如果在将数据持久化到postgres时发生任何异常,或者在将数据上传到Azure blob存储时发生任何例外,该过程应通过异常,它也应该从数据库回滚数据,它应该从Azure blob存储中回滚数据。如果出现异常,接收到的有效负载Sink函数应该放一个kafka主题,但我不知道如何处理,我知道在processfunction中,它支持一个侧面,我们可以通过它将数据发送到不同的主题,但Sink不支持侧面输出。

有没有一种方法可以在出现任何异常的情况下,将Sink中接收到的有效载荷发布到Kaffa主题。

我不确定您现在使用的编程语言,但您可以在process function中使用Scala执行以下操作,并根据流程函数返回的输出调用sink methods

Try {

} 
match {
case Success(x) => {
.
.
Right(x)
}
case Failure(err) => {
.
.
Left(err)
}
}  

您的process element方法如下所示:

override def process(key: Int,context: Context, elements: Iterable[String], out: Collector[(String, String)]): Unit = {
for (i <- elements) {
println("Inside Process.....")
parseJson(i) match {
case Right(data) => {
context.output(goodOutputTag, data)
out.collect(data) //usually used to collect records and emits them to writer.and so on,collect be called when needs to write data.

}
case Left(err) => {
context.output(badOutputTag, dataTuple) // side outputs, when needed to split a stream of data. Emit data to side output and a new datastream can be created using .getSideOutput(outputTag)

}
}
}

现在,使用SuccessFailure案例中的这些output tags,并在invoker object中创建一个数据流,然后调用各自的sink方法。

相关内容

  • 没有找到相关文章

最新更新