将RDD另存为文本文件会给出FileAlreadyExist异常.如何在每次程序加载时创建新文件并使用FileUtils



代码:

val badData:RDD[ListBuffer[String]] = rdd.filter(line => line(1).equals("XX") || line(5).equals("XX"))
badData.coalesce(1).saveAsTextFile(propForFile.getString("badDataFilePath"))

第一次程序运行正常。再次运行时,它会为文件"已存在"引发异常。 我想使用FileUtilsjava 功能解决此问题并将 rdd 另存为文本文件。

在将文件写入指定路径之前,请删除已存在的路径。

val fs = FileSystem.get(sc.hadoopConfiguration)
fs.delete(new Path(bad/data/file/path), true)

然后执行通常的写入过程。希望这应该解决问题。

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
val fs = spark.SparkContext.hadoopCofigurations
if (fs.exists(new Path(path/to/the/files)))
fs.delete(new Path(path/to/the/files), true)

将文件名作为 String 传递给方法,如果存在目录或文件,它将删除。在将这段代码写入输出路径之前,请使用这段代码。

为什么不使用数据帧?将RDD[ListBuffer[String]放入RDD[Row]- 类似 -

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
val badData:RDD[ListBuffer[String]] = rdd.map(line => 
Row(line(0), line(1)... line(n))
.filter(row => filter stuff)
badData.toDF().write.mode(SaveMode.Overwrite)

最新更新