如何将链接表保存到csv文件?



我正在使用 Flink 1.4.0

我正在尝试将表 API 查询的结果保存到 CSV 文件,但我正在 收到错误。 以下是详细信息:

我的输入文件如下所示:

编号,物种,颜色,重量,名称 311,犬,金,75,狗1 312,犬,棕色,22,狗2 313,猫科动物,灰色,8,猫1

我对此运行查询以仅选择犬类,我想将其保存到 CSV 文件:

ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment(); 
BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env); 
String inputPath = "location-of-source-file"; 
CsvTableSource petsTableSource = CsvTableSource.builder() 
.path(inputPath) 
.ignoreFirstLine() 
.fieldDelimiter(",") 
.field("id", Types.INT()) 
.field("species", Types.STRING()) 
.field("color", Types.STRING()) 
.field("weight", Types.DOUBLE()) 
.field("name", Types.STRING()) 
.build(); 
// Register our table source 
tableEnv.registerTableSource("pets", petsTableSource); 
Table pets = tableEnv.scan("pets"); 
Table counts = pets 
.groupBy("species") 
.select("species, species.count as count") 
.filter("species === 'canine'"); 
// Convert to Dataset and display results
DataSet<Row> result = tableEnv.toDataSet(counts, Row.class); 
result.print(); 
// Write Results to File 
TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets.csv", ","); 
counts.writeToSink(sink); 

当我运行它时,我看到数据集的结果被输出:
犬,2

但是我在输出文件中没有得到任何结果,我看到了这些 下面的错误。我该怎么做才能解决这个问题?谢谢!

2018-05-27 13:29:17,040 信息 [主要] 类型utils.TypeExtractor (TypeExtractor.java:1873( - class org.apache.flink.types.Row 不包含字段字段的 getter 2018-05-27 13:29:17,040 信息 [主要] 类型utils.TypeExtractor (TypeExtractor.java:1876( - class org.apache.flink.types.Row 不包含字段字段的 setter 2018-05-27 13:29:17,040 信息 [主要] 类型utils.TypeExtractor (TypeExtractor.java:1911( - class org.apache.flink.types.Row 不是有效的 POJO 类型,因为并非所有字段都是有效的 POJO 字段。 2018-05-27 13:29:17,047 信息 [主要] 类型utils.TypeExtractor (TypeExtractor.java:1873( - class org.apache.flink.types.Row 不包含字段字段的 getter 2018-05-27 13:29:17,047 信息 [主要] 类型utils.TypeExtractor (TypeExtractor.java:1876( - class org.apache.flink.types.Row 不包含字段字段的 setter 2018-05-27 13:29:17,047 信息 [主要] 类型utils.TypeExtractor (TypeExtractor.java:1911( - class org.apache.flink.types.Row 不是有效的 POJO 类型,因为并非所有字段都是有效的 POJO 字段。

您可能缺少一个

env.execute(); 

counts.writeToSink(sink);

与立即触发执行的print()相反,writeToSink()只是附加一个接收器运算符并需要显式触发执行。

TypeExtractorINFO消息"只是"告诉您,Row不能用作POJO类型,但这在这里很好。

相关内容

  • 没有找到相关文章

最新更新