在Flink中,如何将数据流写入单个文件



DataStreamwriteAsTextwriteAsCsv方法写入的文件与工作线程一样多。据我所知,这些方法只允许您指定这些文件的路径和一些格式化。

为了调试和测试的目的,能够将所有内容打印到单个文件中,而不必将设置更改为具有单个工作线程,这将是非常有用的。

是否有不太复杂的方法来实现这一点?我怀疑应该有可能实现自定义SinkFunction,但不确定那个(此外,对于一些看起来相对简单的东西来说,它也感觉像一个麻烦)。

您可以通过将并行度设置为1来实现这一点。这样,写入操作只在一台机器上进行。

writeAsText(path).setParallelism(1);

在Flink 1.13中,不再使用writeAsText函数,因为它已被弃用。

可以在这里看到现在StreamingFileSink类和addSink操作应该使用。关于将并行度设置为1,这也是不同的(通过使用setParallelism方法将StreamExecutionEnvironment的并行度设置为1)

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val sink: StreamingFileSink[String] = StreamingFileSink
  .forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8"))
  .build()
dataStream.map(_.toString).addSink(sink)

相关内容

  • 没有找到相关文章

最新更新