我试图在Flink中执行的部分代码:
val pages = env.readCsvFile[(Long)]("/home/ppi.csv",
fieldDelimiter = "t", includedFields = Array(1))
我想将pages
用于其他目的,但当我编译时,Flink向我抛出错误消息
main线程异常java.lang.ClassCastException:
不能将typeinfo. integertypeinfo强制转换为
org.apache.flink.api.java.typeutils.PojoTypeInfo
顺便说一下,我使用的是Flink的0.9快照版本。
如果从CSV文件中读取,返回类型将是包含所有读取字段的Scala元组。在您的示例中,您只读取一个字段,该字段将给出Tuple1。这就是你试图用"Long"周围的圆括号指定的内容:
readCsvFile[(Long)]
在Scala中,您只能使用括号指定包含两个或多个字段的元组。所以你需要写
readCsvFile[Tuple1[Long]]
抛出异常,因为Flink的CSVInputFormat试图将所有非元组类型解释为Pojo类型。