从Flink到文件的JSON以JSON的形式编写数据



当我们尝试使用方法 writeascsv(path,writemode(的方法将flink的处理后的json数据写入文件中时,在没有发生的每个JSON之后,我们需要插入逗号。我们正在使用apache kafka作为数据源来倾斜。

DataStream<Tuple5<String, String, String, String, String>> messageStream = env.addSource(new FlinkKafkaConsumer08<>(FLINK_TOPIC, new SimpleStringSchema(), properties)).flatMap(new StreamToTuple5()).keyBy(0);
String path = "/home/user/Documents/docs/csvfile/";
messageStream.writeAsCsv(path, FileSystem.WriteMode.OVERWRITE);

执行此方法获得的输出是

{"temperSensorData":"28.489084691371364","temperSensorUnit":"celsius","timestamp":"1493270680759","timestamp2":"1493270680786","timestamp3":"1493270680787"}
{"temperSensorData":"28.489084691371467","temperSensorUnit":"celsius","timestamp":"1493270680761","timestamp2":"1493270680816","timestamp3":"1493270680816"}

您可以通过将','指定为行定界线来解决问题:

messageStream.writeAsCsv(path, FileSystem.WriteMode.OVERWRITE, ",", ",")

这里的第三个参数是行定界符。

相关内容

  • 没有找到相关文章

最新更新