Scala-如何合并HDFS位置的增量文件



我的要求是我有一个多HDFS位置,每小时从Kafka接收文件。因此,对于每个目录,如何将特定时间戳的所有文件合并为单个镶木地板文件,并在下次将文件从上次合并的时间戳合并为当前时间戳,并在将来重复相同操作。这就是我在Spark Scala工作中所要做的,所以不能使用普通的shell脚本。欢迎提出任何建议。

这里有一个代码片段,可以帮助完成任务。

第一步是以Map的形式获取每个日期的文件列表。(Map[String, List[String]]),其中键为Date,值为具有相同日期的文件列表。日期取自HDFS文件的修改时间戳。

注意:使用本地路径测试代码,根据需要提供正确的HDFS路径/url。

在编写输出时,没有直接的选项来指定目标文件名,但您可以指定特定于每个日期的目标目录。代码使我们使用FileSystem API将文件重命名为所需文件,并删除按日期创建的临时输出文件夹。

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.SparkContext
import org.joda.time.format.DateTimeFormat

object MergeFiles {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Merging files day wise in a directory")
.master("local[2]")
.getOrCreate()
val inputDir = "/Users/sujesh/test_data"
val outputDir = "/Users/sujesh/output_data"
val hadoopConf = spark.sparkContext.hadoopConfiguration
val fs = FileSystem.get(hadoopConf)
val filesPerDate = getFiles(inputDir, fs)
filesPerDate
.foreach { m =>
spark
.read
.format("csv")
.option("inferSchema", false)
.option("header", false)
.load(m._2:_*)
.repartition(1)
.write
.format("csv")
.save(s"$outputDir/${m._1}")
val file = fs.globStatus(new Path(s"$outputDir/${m._1}/part*.csv"))(0).getPath.getName
fs.rename(new Path(s"$outputDir/${m._1}/$file"), new Path(s"$outputDir/${m._1}.csv"))
fs.delete(new Path(s"$outputDir/${m._1}"), true)
}
}
/*
Get the list of files group by date
date is taken from file's modification timestamp
*/
def getFiles(dir: String, fs: FileSystem) = {
fs
.globStatus(new Path(s"$dir/*.csv"))
.map { f: FileStatus =>
(DateTimeFormat.forPattern("yyyyMMdd").print(f.getModificationTime), f.getPath.toUri.getRawPath)
}.groupBy(_._1)
.map { case (k,v) => (k -> v.map(_._2).toSeq) }
}
}

您可以在测试后进一步优化代码,并在必须重新使用的情况下将文件重命名代码转换为util。已将所有选项(如inferSchemaheader(设置为false。根据需要使用它们。这种方法也适用于其他格式的文件。

注意:如果您在同一目录中重复执行此过程,则需要进一步调整,因为新创建的文件将具有最新的时间戳。因此,如果这不是每天运行的,您也需要显式更新文件的修改时间戳,或者忽略具有文件名模式(如yyyyMMdd.csv(的文件

最新更新