writeAsCSV()和writeAsText()是意外的



我通过Scala API使用apache flink,在某些时候我获得了DataSet[(Int, Int, Int)]。使用writeAsCSV()writeAsText()方法的结果是出乎意料的。它创建了一个目录。该目录的位置和名称是方法调用的第一个参数(例如filePath )。在该目录中出现了两个文件名为"1"one_answers"2"的文件。在这些文件中,我可以看到DataSets数据。他们似乎把DataSets的内容分成了这两个文件。试图重新创建此行为以显示更简洁的代码片段,我不能。也就是说,我看到在预期位置创建了一个具有预期名称的文件,而没有创建目录。val mas = ma_ groupBy(0,1) sum(2)mas.writeAsCsv (c: flink mas.csv)

将导致创建一个名为"mas.csv"的目录,其中包含两个文件"1"one_answers"2"。什么时候会发生这种事?使用flink 9.1本地模式,Windows 7, scala 2.10, eclipse3.0.3

这是预期的行为。如果希望获得单个输出文件,则需要将接收器的并行度设置为1。

dataset = dataset.writeAsCsv("filename").setParallelism(1);

对于DataStream API,您需要插入一个额外的rebalane()来打破操作符链。否则,整个链将执行dop=1或setParallelism()可能被忽略。

datastream = datastream.rebalance().writeAsCsv("filename").setParallelism(1);

相关内容

  • 没有找到相关文章

最新更新