这是一个用scala匿名函数使用Flink折叠的不正常的尝试:
val myFoldFunction = (x: Double, t:(Double,String,String)) => x + t._1
env.readFileStream(...).
...
.groupBy(1)
.fold(0.0, myFoldFunction : Function2[Double, (Double,String,String), Double])
它编译得很好,但是在执行时,我遇到了一个"类型擦除问题"(见下文)。在Java中这样做是可以的,但当然更冗长。我喜欢简洁明了的lambdas。在scala中怎么做呢?
Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation,scala.reflect.ClassTag)' could not be determined.
This is most likely a type erasure problem.
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).
您遇到的问题是Flink[1]中的错误。问题源于Flink的TypeExtractor
和Scala数据流API在Java实现之上实现的方式。TypeExtractor
不能为Scala类型生成TypeInformation
,因此返回MissingTypeInformation
。这个缺失的类型信息是在创建StreamFold
操作符之后手动设置的。然而,StreamFold
操作符的实现方式不接受MissingTypeInformation
,因此,在设置正确的类型信息之前失败。
我已经打开了一个pull request[2]来解决这个问题。它应该会在接下来的两天内合并。通过使用最新的0.10快照版本,您的问题应该可以解决。
- [1] https://issues.apache.org/jira/browse/flink - 2631
- [2] https://github.com/apache/flink/pull/1101