Flink get RuntimeContext



在我的 Flink 代码中,我使用的是自定义输入格式,这会引发异常。似乎我需要RuntimeContext的实例,但是我怎样才能得到一个呢?

我的格式类如下所示:

MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T]{
@transient var lineCounter: IntCounter = _
override def open(split: FileInputSplit): Unit = {
    super.open(split)
    lineCounter = new IntCounter()
    getRuntimeContext.addAccumulator("rowsInFile", lineCounter) // this line throws IllegalStateException

我的主程序如下所示:

val env = ExecutionEnvironment.getExecutionEnvironment
val format = new MyInputFormat
env.readFile(format, inputFile.getAbsolutePath) // throws exception

引发的异常:

java.lang.IllegalStateException: The runtime context has not been initialized yet. Try accessing it in one of the other life cycle methods.
    at org.apache.flink.api.common.io.RichInputFormat.getRuntimeContext(RichInputFormat.java:51)

我的类需要一个RuntimeContext,因为它扩展了DelimitedInputFormat扩展了...... RichInputFormat

public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputSplit>
public abstract class RichInputFormat<OT, T extends InputSplit> implements InputFormat<OT, T>
    private transient RuntimeContext runtimeContext;
    public void setRuntimeContext(RuntimeContext t)
    public RuntimeContext getRuntimeContext()

因此,任何RichInputFormat实例都希望我们在创建后setRuntimeContext(RuntimeContext t)

我希望我应该执行以下操作:

val env = ExecutionEnvironment.getExecutionEnvironment
val runtimeContext: RuntimeContext = ??? // How do I get this?
val format = new MyInputFormat
format.setRuntimeContext(runtimeContext)
env.readFile(format, inputFile.getAbsolutePath) // no longer throws exception

但是如何获取 RuntimeContext 的实例呢?抛出异常是因为我的自定义输入格式没有RuntimeContext。我会设置一个,但我不知道从哪里得到它。

您应该在生命周期方法中初始化 RuntimeContext,例如open

MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T] {
override def  openInputFormat() = {
    getRuntimeContext.addAccumulator("rowsInFile", lineCounter)
}

我在 Flink 中遇到了同样的问题。看起来 setRuntimeContext 是由 Flink 在引擎盖下自动调用的,而不是在open调用期间调用的,我找不到任何明显的文档来解释这一点。但是你可以做一些类似的事情

lazy val acc = getRuntimeContext.addAccumulator(accName, acc)

在您的类定义中,然后调用

acc.add(v)

代码中的其他点,这保证被初始化,例如在 Flink 类的重写方法之一中。

我还

不明白为什么,但似乎MyInputFormat被实例化了几次,包括在RuntimeContext可用之前。但是,尽管如此,该作业仍有效并计算需要执行的操作。我已经通过在try中封装所有对addAccumulator(,)的调用来解决此问题,如下所示:

  private def addAccumulator(accName: String, acc: SimpleAccumulator[_]): Unit = {
    try {
      val rc = getRuntimeContext.getAccumulator(accName) // throws if RuntimeContext not yet set
      if (rc == null) getRuntimeContext.addAccumulator(accName, acc)
    } catch {
      case NonFatal(_) =>
    }
  }

尽管我在 open() 内部调用addAccumulator(,),但我还是需要这样做,这似乎是正确的生命周期方法。另外:由于并行性,几个子作业试图添加相同的累加器,这是错误的。这就是为什么我试图先获得累加器。如果还没有上下文,没问题:我稍后会得到一个。如果累加器已经存在,没问题 - 无事可做。这只是一种解决方法,而不是解决方案 - 但这就是我现在所拥有的。

相关内容

  • 没有找到相关文章

最新更新