Apache Flink - "keyBy"中的异常处理


进入

Flink 作业的数据可能会由于代码错误或缺乏验证而触发异常。我的目标是提供一致的异常处理方式,我们的团队可以在 Flink 作业中使用,不会导致生产中的任何停机时间。

  1. 重新启动策略似乎不适用于此处,因为:

    • 简单的重启无法解决问题,我们陷入重启循环
    • 我们不能简单地跳过事件
    • 它们可能对 OOME 或某些暂时性问题有好处
    • 我们无法添加自定义的
  2. "keyBy"函数中的try/catch块不能完全帮助,因为:

    • 处理异常后无法跳过"keyBy"中的事件

示例代码:

env.addSource(kafkaConsumer)
    .keyBy(keySelector) // must return one result for one entry
    .flatMap(mapFunction) // we can skip some entries here in case of errors
    .addSink(new PrintSinkFunction<>());
env.execute("Flink Application");

我希望能够跳过在"keyBy"和应该只返回一个结果的类似方法中导致问题的事件的处理。

除了@phanhuy152的建议(对我来说似乎是完全合法的(之外,为什么不在keyBy之前filter呢?

env.addSource(kafkaConsumer)
    .filter(invalidKeys)
    .keyBy(keySelector) // must return one result for one entry
    .flatMap(mapFunction) // we can skip some entries here in case of errors
    .addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
在这种情况下,

您可以保留一个特殊值(如"NULL"(供keyBy返回吗?那么你的flatMap函数在遇到这样的值时可以跳过吗?

相关内容

  • 没有找到相关文章

最新更新