我正在Flink 中尝试以下场景
- Flink使用kafka主题中的数据并根据avro模式进行验证
- 在对数据进行一些富集后,在process函数中将数据转换为JSON有效载荷
- 数据丰富后应写入Postgres数据库,并通过Flink RichSinkFunction将数据上传到Azure blob存储
我在Sink函数中被困在一个位置,该过程应该以事务方式发生,这意味着如果在将数据持久化到postgres时发生任何异常,或者在将数据上传到Azure blob存储时发生任何例外,该过程应通过异常,它也应该从数据库回滚数据,它应该从Azure blob存储中回滚数据。如果出现异常,接收到的有效负载Sink函数应该放一个kafka主题,但我不知道如何处理,我知道在processfunction中,它支持一个侧面,我们可以通过它将数据发送到不同的主题,但Sink不支持侧面输出。
有没有一种方法可以在出现任何异常的情况下,将Sink中接收到的有效载荷发布到Kaffa主题。
我不确定您现在使用的编程语言,但您可以在process function
中使用Scala
执行以下操作,并根据流程函数返回的输出调用sink methods
。
Try {
}
match {
case Success(x) => {
.
.
Right(x)
}
case Failure(err) => {
.
.
Left(err)
}
}
您的process element
方法如下所示:
override def process(key: Int,context: Context, elements: Iterable[String], out: Collector[(String, String)]): Unit = {
for (i <- elements) {
println("Inside Process.....")
parseJson(i) match {
case Right(data) => {
context.output(goodOutputTag, data)
out.collect(data) //usually used to collect records and emits them to writer.and so on,collect be called when needs to write data.
}
case Left(err) => {
context.output(badOutputTag, dataTuple) // side outputs, when needed to split a stream of data. Emit data to side output and a new datastream can be created using .getSideOutput(outputTag)
}
}
}
现在,使用Success
和Failure
案例中的这些output tags
,并在invoker object
中创建一个数据流,然后调用各自的sink方法。