如何在scala中使用flink折叠函数



这是一个用scala匿名函数使用Flink折叠的不正常的尝试:

val myFoldFunction = (x: Double, t:(Double,String,String)) => x + t._1
env.readFileStream(...).
...
.groupBy(1)
.fold(0.0, myFoldFunction : Function2[Double, (Double,String,String), Double])

它编译得很好,但是在执行时,我遇到了一个"类型擦除问题"(见下文)。在Java中这样做是可以的,但当然更冗长。我喜欢简洁明了的lambdas。在scala中怎么做呢?

Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation,scala.reflect.ClassTag)' could not be determined. 
This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

您遇到的问题是Flink[1]中的错误。问题源于Flink的TypeExtractor和Scala数据流API在Java实现之上实现的方式。TypeExtractor不能为Scala类型生成TypeInformation,因此返回MissingTypeInformation。这个缺失的类型信息是在创建StreamFold操作符之后手动设置的。然而,StreamFold操作符的实现方式不接受MissingTypeInformation,因此,在设置正确的类型信息之前失败。

我已经打开了一个pull request[2]来解决这个问题。它应该会在接下来的两天内合并。通过使用最新的0.10快照版本,您的问题应该可以解决。

  • [1] https://issues.apache.org/jira/browse/flink - 2631
  • [2] https://github.com/apache/flink/pull/1101

相关内容

  • 没有找到相关文章

最新更新