我使用Flink(1.7.1和Hadoop)进行了一些数据处理。最后,我想将由2个元组组成的数据集写入一个文件。目前,我是这样做的:
<Tuple2<Integer, Point>> pointsClustered = points.getClusteredPoints(...);
pointsClustered.writeAsCsv(params.get("output"), "n", ",");
但是,我希望将CSV标头写入第一行。Flink的Javadoc API没有说明任何选项。此外,我在谷歌上找不到任何解决方案。
你能就如何做到这一点提出建议吗。非常感谢!
Flink自己的CsvOutputFormat
不支持此功能。您可以做的是扩展CsvOutputFormat
并覆盖open
方法,该方法在打开格式时写入标头。然后使用DataSet#output
指定新创建的输出格式:
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Integer> input = env.fromElements(1, 2, 3);
DataSet<Tuple3<Integer, String, Double>> result = input.map((MapFunction<Integer, Tuple3<Integer, String, Double>>) integer -> Tuple3.of(integer, integer.toString(), 42.0));
Path outputPath = new Path("hdfs:///foobar");
result.output(new MyCsvOutputFormat(outputPath));
env.execute();
}
private static class MyCsvOutputFormat<T extends Tuple> extends CsvOutputFormat<T> {
public MyCsvOutputFormat(Path outputPath) {
super(outputPath);
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try (PrintWriter wrt = new PrintWriter(stream)) {
wrt.println("Foo|bar|foobar");
}
super.open(taskNumber, numTasks);
}
}
我可以通过使用并集向数据集添加一个头行来绕过限制。这样,第一行将始终是导出的标题行。
DataSet<Tuple8<String, String, String, String, String, String, String, String>> headers = env.fromElements(
Tuple8.of(
"SDMId", "ActivityType", "ActionType", "ActivityId", "ActivityLevel", "Timestamp", "SessionId", "Value"
));
DataSet<Tuple8<String, String, String, String, String, String, String, String>> results =
headers.union(skillResults);
results.writeAsCsv("file:///Users/karthsub/Desktop/first_export.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);