进入
Flink 作业的数据可能会由于代码错误或缺乏验证而触发异常。我的目标是提供一致的异常处理方式,我们的团队可以在 Flink 作业中使用,不会导致生产中的任何停机时间。
重新启动策略似乎不适用于此处,因为:
- 简单的重启无法解决问题,我们陷入重启循环
- 我们不能简单地跳过事件
- 它们可能对 OOME 或某些暂时性问题有好处
- 我们无法添加自定义的
"keyBy"函数中的try/catch块不能完全帮助,因为:
- 处理异常后无法跳过"keyBy"中的事件
示例代码:
env.addSource(kafkaConsumer)
.keyBy(keySelector) // must return one result for one entry
.flatMap(mapFunction) // we can skip some entries here in case of errors
.addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
我希望能够跳过在"keyBy"和应该只返回一个结果的类似方法中导致问题的事件的处理。
除了@phanhuy152的建议(对我来说似乎是完全合法的(之外,为什么不在keyBy
之前filter
呢?
env.addSource(kafkaConsumer)
.filter(invalidKeys)
.keyBy(keySelector) // must return one result for one entry
.flatMap(mapFunction) // we can skip some entries here in case of errors
.addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
在这种情况下,
您可以保留一个特殊值(如"NULL"(供keyBy
返回吗?那么你的flatMap
函数在遇到这样的值时可以跳过吗?