DataStream
的writeAsText
或writeAsCsv
方法写入的文件与工作线程一样多。据我所知,这些方法只允许您指定这些文件的路径和一些格式化。
为了调试和测试的目的,能够将所有内容打印到单个文件中,而不必将设置更改为具有单个工作线程,这将是非常有用的。
是否有不太复杂的方法来实现这一点?我怀疑应该有可能实现自定义SinkFunction
,但不确定那个(此外,对于一些看起来相对简单的东西来说,它也感觉像一个麻烦)。
您可以通过将并行度设置为1来实现这一点。这样,写入操作只在一台机器上进行。
writeAsText(path).setParallelism(1);
在Flink 1.13中,不再使用writeAsText函数,因为它已被弃用。
可以在这里看到现在StreamingFileSink类和addSink操作应该使用。关于将并行度设置为1,这也是不同的(通过使用setParallelism方法将StreamExecutionEnvironment的并行度设置为1)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8"))
.build()
dataStream.map(_.toString).addSink(sink)