当我们尝试使用方法 writeascsv(path,writemode(的方法将flink的处理后的json数据写入文件中时,在没有发生的每个JSON之后,我们需要插入逗号。我们正在使用apache kafka作为数据源来倾斜。
DataStream<Tuple5<String, String, String, String, String>> messageStream = env.addSource(new FlinkKafkaConsumer08<>(FLINK_TOPIC, new SimpleStringSchema(), properties)).flatMap(new StreamToTuple5()).keyBy(0);
String path = "/home/user/Documents/docs/csvfile/";
messageStream.writeAsCsv(path, FileSystem.WriteMode.OVERWRITE);
执行此方法获得的输出是
{"temperSensorData":"28.489084691371364","temperSensorUnit":"celsius","timestamp":"1493270680759","timestamp2":"1493270680786","timestamp3":"1493270680787"}
{"temperSensorData":"28.489084691371467","temperSensorUnit":"celsius","timestamp":"1493270680761","timestamp2":"1493270680816","timestamp3":"1493270680816"}
您可以通过将','
指定为行定界线来解决问题:
messageStream.writeAsCsv(path, FileSystem.WriteMode.OVERWRITE, ",", ",")
这里的第三个参数是行定界符。