Spark 将多个目录读入多个数据帧



我在 S3 上有一个目录结构,如下所示:

foo
  |-base
     |-2017
        |-01
           |-04
              |-part1.orc, part2.orc ....
  |-A
     |-2017
        |-01
           |-04
              |-part1.orc, part2.orc ....
  |-B
     |-2017
        |-01
           |-04
              |-part1.orc, part2.orc ....

这意味着对于目录foo,我根据作业的时间戳在给定路径中有多个输出表,baseAB等。

我想根据时间戳和主目录left join它们,在本例中为 foo.这意味着在每个输出表中读取baseAB等,以读取新的单独输入表,并在其上应用left join。全部以base表为起点

像这样的东西(不是工作代码!

val dfs: Seq[DataFrame] = spark.read.orc("foo/*/2017/01/04/*")
val base: DataFrame = spark.read.orc("foo/base/2017/01/04/*")
val result = dfs.foldLeft(base)((l, r) => l.join(r, 'id, "left"))

有人可以指出我如何获取该数据帧序列的正确方向吗?甚至可能值得将读取视为延迟或顺序读取,因此仅在应用联接以减少内存需求时读取AB表。

注意:目录结构不是最终的,这意味着如果适合解决方案,它可以更改。

我所知,Spark使用底层Hadoop API来读取数据文件。因此,继承的行为是将您指定的所有内容读取到一个RDD/数据帧中。

要实现您想要的,您可以首先获取目录列表:

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{ FileSystem, Path }
    val path = "foo/"
    val hadoopConf = new Configuration()
    val fs = FileSystem.get(hadoopConf)
    val paths: Array[String] = fs.listStatus(new Path(path)).
      filter(_.isDirectory).
      map(_.getPath.toString)

然后将它们加载到单独的数据帧中:

    val dfs: Array[DataFrame] = paths.
      map(path => spark.read.orc(path + "/2017/01/04/*"))

这是一个直接的解决方案,可以解决您正在尝试做的事情(我认为(,不使用Hive或内置分区功能等额外功能:

import spark.implicits._
// load base
val baseDF = spark.read.orc("foo/base/2017/01/04").as("base")
// create or use existing Hadoop FileSystem - this should use the actual config and path
val fs = FileSystem.get(new URI("."), new Configuration())
// find all other subfolders under foo/
val otherFolderPaths = fs.listStatus(new Path("foo/"), new PathFilter {
  override def accept(path: Path): Boolean = path.getName != "base"
}).map(_.getPath)
// use foldLeft to join all, using the DF aliases to find the right "id" column
val result = otherFolderPaths.foldLeft(baseDF) { (df, path) =>
  df.join(spark.read.orc(s"$path/2017/01/04").as(path.getName), $"base.id" === $"${path.getName}.id" , "left") }

最新更新