我正在使用 Flink 1.4.0
我正在尝试将表 API 查询的结果保存到 CSV 文件,但我正在 收到错误。 以下是详细信息:
我的输入文件如下所示:
编号,物种,颜色,重量,名称 311,犬,金,75,狗1 312,犬,棕色,22,狗2 313,猫科动物,灰色,8,猫1
我对此运行查询以仅选择犬类,我想将其保存到 CSV 文件:
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
String inputPath = "location-of-source-file";
CsvTableSource petsTableSource = CsvTableSource.builder()
.path(inputPath)
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id", Types.INT())
.field("species", Types.STRING())
.field("color", Types.STRING())
.field("weight", Types.DOUBLE())
.field("name", Types.STRING())
.build();
// Register our table source
tableEnv.registerTableSource("pets", petsTableSource);
Table pets = tableEnv.scan("pets");
Table counts = pets
.groupBy("species")
.select("species, species.count as count")
.filter("species === 'canine'");
// Convert to Dataset and display results
DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
result.print();
// Write Results to File
TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets.csv", ",");
counts.writeToSink(sink);
当我运行它时,我看到数据集的结果被输出:
犬,2
但是我在输出文件中没有得到任何结果,我看到了这些 下面的错误。我该怎么做才能解决这个问题?谢谢!
2018-05-27 13:29:17,040 信息 [主要] 类型utils.TypeExtractor (TypeExtractor.java:1873( - class org.apache.flink.types.Row 不包含字段字段的 getter 2018-05-27 13:29:17,040 信息 [主要] 类型utils.TypeExtractor (TypeExtractor.java:1876( - class org.apache.flink.types.Row 不包含字段字段的 setter 2018-05-27 13:29:17,040 信息 [主要] 类型utils.TypeExtractor (TypeExtractor.java:1911( - class org.apache.flink.types.Row 不是有效的 POJO 类型,因为并非所有字段都是有效的 POJO 字段。 2018-05-27 13:29:17,047 信息 [主要] 类型utils.TypeExtractor (TypeExtractor.java:1873( - class org.apache.flink.types.Row 不包含字段字段的 getter 2018-05-27 13:29:17,047 信息 [主要] 类型utils.TypeExtractor (TypeExtractor.java:1876( - class org.apache.flink.types.Row 不包含字段字段的 setter 2018-05-27 13:29:17,047 信息 [主要] 类型utils.TypeExtractor (TypeExtractor.java:1911( - class org.apache.flink.types.Row 不是有效的 POJO 类型,因为并非所有字段都是有效的 POJO 字段。
您可能缺少一个
env.execute();
后
counts.writeToSink(sink);
与立即触发执行的print()
相反,writeToSink()
只是附加一个接收器运算符并需要显式触发执行。
TypeExtractor
的INFO
消息"只是"告诉您,Row
不能用作POJO类型,但这在这里很好。