错误处理的正确方式



我想知道在Flink中是否有一个内置错误处理的选项。可能有两种情况:

  1. 来自Kafka的当前消息(在我的情况下)是无效的,继续下一个

  2. uncaught exception -从我看到的它可以完全停止流聚合

我怎么处理这两种情况?(java代码)

1)这是用flatMap习惯地完成的:如果您的消息有效,则继续使用包含有效元素的列表(可能已经在同一步骤中处理过)。如果无效,只需返回一个空列表,这样该步骤就不会产生任何元素。我可以提供Scala代码,但我不熟悉Java api,所以我不想让你们偏离轨道。查看flatMap调用

2)这取决于异常的类型:如果它是由您自己的代码引发的,只需捕获它并在操作符内处理它,或者简单地记录它并继续前进。在没有任何关于具体情况的进一步信息的情况下,这是我所知道的最好的,但是,再次强调,来自Scala的我还没有经历过运行时异常。

相关内容

  • 没有找到相关文章

最新更新