我一直在Flink中运行InvalidTypesException
,通常是在自定义通用SourceFunction<OUT>
时。下面是一个例子,当添加到我的StreamExecutionEnvironment时,它会在运行时抛出以下异常:
public class MyCustomSource<OUT> extends RichSourceFunction<OUT> {
@Override
public void run(SourceContext<OUT> sourceContext) throws Exception {
OUT foo = null;
// ... creates foo somehow ...
sourceContext.collect(foo);
}
@Override
public void cancel() {
// ...
}
}
相关的例外文本为:
由:org.apache.flink.api.common.functions.InvalidTypesException引起:无法确定"class org.apache.frink.streaming.api.functions.source.RichSourceFunction"中TypeVariable"OUT"的类型。这很可能是一个类型擦除问题。类型提取目前仅在返回类型中的所有变量都可以从输入类型推导出来的情况下才支持具有泛型变量的类型。
无论OUT
是POJO、Generic类型、Flink内部类型(如Tuple)等,都会发生这种情况。
我已经找到了一种可靠的方法来避免这种情况,通过returns()
方法添加Type Hint。例如:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new MyCustomSource<String>())
.returns(String.class)
//.etc.
但这种方法在flink 1.1.4中被弃用;有人知道什么是不推荐使用的提供类型提示的方法吗?Flink内部维基只提到了returns()
,但它上一次更新是在一年多前。
您的MyCustomSource
应该实现ResultTypeQueryable
接口,以将类型作为TypeInformation
返回给Flink。
请参阅https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/common/index.html#type-擦除类型推断