在我的 Flink 代码中,我使用的是自定义输入格式,这会引发异常。似乎我需要RuntimeContext
的实例,但是我怎样才能得到一个呢?
我的格式类如下所示:
MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T]{
@transient var lineCounter: IntCounter = _
override def open(split: FileInputSplit): Unit = {
super.open(split)
lineCounter = new IntCounter()
getRuntimeContext.addAccumulator("rowsInFile", lineCounter) // this line throws IllegalStateException
我的主程序如下所示:
val env = ExecutionEnvironment.getExecutionEnvironment
val format = new MyInputFormat
env.readFile(format, inputFile.getAbsolutePath) // throws exception
引发的异常:
java.lang.IllegalStateException: The runtime context has not been initialized yet. Try accessing it in one of the other life cycle methods.
at org.apache.flink.api.common.io.RichInputFormat.getRuntimeContext(RichInputFormat.java:51)
我的类需要一个RuntimeContext
,因为它扩展了DelimitedInputFormat
扩展了...... RichInputFormat
public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputSplit>
public abstract class RichInputFormat<OT, T extends InputSplit> implements InputFormat<OT, T>
private transient RuntimeContext runtimeContext;
public void setRuntimeContext(RuntimeContext t)
public RuntimeContext getRuntimeContext()
因此,任何RichInputFormat
实例都希望我们在创建后setRuntimeContext(RuntimeContext t)
。
我希望我应该执行以下操作:
val env = ExecutionEnvironment.getExecutionEnvironment
val runtimeContext: RuntimeContext = ??? // How do I get this?
val format = new MyInputFormat
format.setRuntimeContext(runtimeContext)
env.readFile(format, inputFile.getAbsolutePath) // no longer throws exception
但是如何获取 RuntimeContext 的实例呢?抛出异常是因为我的自定义输入格式没有RuntimeContext
。我会设置一个,但我不知道从哪里得到它。
您应该在生命周期方法中初始化 RuntimeContext,例如open
MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T] {
override def openInputFormat() = {
getRuntimeContext.addAccumulator("rowsInFile", lineCounter)
}
我在 Flink 中遇到了同样的问题。看起来 setRuntimeContext 是由 Flink 在引擎盖下自动调用的,而不是在open
调用期间调用的,我找不到任何明显的文档来解释这一点。但是你可以做一些类似的事情
lazy val acc = getRuntimeContext.addAccumulator(accName, acc)
在您的类定义中,然后调用
acc.add(v)
在代码中的其他点,这保证被初始化,例如在 Flink 类的重写方法之一中。
不明白为什么,但似乎MyInputFormat
被实例化了几次,包括在RuntimeContext
可用之前。但是,尽管如此,该作业仍有效并计算需要执行的操作。我已经通过在try
中封装所有对addAccumulator(,)
的调用来解决此问题,如下所示:
private def addAccumulator(accName: String, acc: SimpleAccumulator[_]): Unit = {
try {
val rc = getRuntimeContext.getAccumulator(accName) // throws if RuntimeContext not yet set
if (rc == null) getRuntimeContext.addAccumulator(accName, acc)
} catch {
case NonFatal(_) =>
}
}
尽管我在 open()
内部调用addAccumulator(,)
,但我还是需要这样做,这似乎是正确的生命周期方法。另外:由于并行性,几个子作业试图添加相同的累加器,这是错误的。这就是为什么我试图先获得累加器。如果还没有上下文,没问题:我稍后会得到一个。如果累加器已经存在,没问题 - 无事可做。这只是一种解决方法,而不是解决方案 - 但这就是我现在所拥有的。