我通过Scala API使用apache flink,在某些时候我获得了DataSet[(Int, Int, Int)]
。使用writeAsCSV()
和writeAsText()
方法的结果是出乎意料的。它创建了一个目录。该目录的位置和名称是方法调用的第一个参数(例如filePath
)。在该目录中出现了两个文件名为"1"one_answers"2"的文件。在这些文件中,我可以看到DataSets数据。他们似乎把DataSets的内容分成了这两个文件。试图重新创建此行为以显示更简洁的代码片段,我不能。也就是说,我看到在预期位置创建了一个具有预期名称的文件,而没有创建目录。val mas = ma_ groupBy(0,1) sum(2)mas.writeAsCsv (c: flink mas.csv)
将导致创建一个名为"mas.csv"的目录,其中包含两个文件"1"one_answers"2"。什么时候会发生这种事?使用flink 9.1本地模式,Windows 7, scala 2.10, eclipse3.0.3
这是预期的行为。如果希望获得单个输出文件,则需要将接收器的并行度设置为1。
dataset = dataset.writeAsCsv("filename").setParallelism(1);
对于DataStream API,您需要插入一个额外的rebalane()
来打破操作符链。否则,整个链将执行dop=1或setParallelism()
可能被忽略。
datastream = datastream.rebalance().writeAsCsv("filename").setParallelism(1);