如何在Spark中读取嵌套集合



我有一张镶木地板桌子,其中一列是

,数组<结构<col1,col2,。。colN>>

可以使用LATERAL VIEW语法在配置单元中对此表运行查询。

如何将这个表读取到RDD中,更重要的是如何在Spark中过滤、映射等这个嵌套集合?

在Spark文档中找不到对此的任何引用。提前感谢提供任何信息!

ps。我觉得在桌面上提供一些统计数据可能会有所帮助。主表中的列数~600。行数~200m。嵌套集合中的"列"数约为10。嵌套集合中的平均记录数~35。

在嵌套集合的情况下没有魔法。Spark将以与RDD[(String, String)]RDD[(String, Seq[String])]相同的方式处理。

不过,从Parquet文件中读取这样的嵌套集合可能很棘手。

让我们以spark-shell(1.3.1):为例

scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> case class Inner(a: String, b: String)
defined class Inner
scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer

编写拼花地板文件:

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> outers.toDF.saveAsParquetFile("outers.parquet")

阅读拼花地板文件:

scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row
scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   
scala> val outers = dataFrame.map { row =>
|   val key = row.getString(0)
|   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
|   Outer(key, inners)
| }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848

其中最重要的部分是row.getAs[Seq[Row]](1)struct的嵌套序列的内部表示是ArrayBuffer[Row],您可以使用它的任何超类型来代替Seq[Row]1是外部行中的列索引。我在这里使用了getAs方法,但在Spark的最新版本中也有其他方法。请参阅Row特性的源代码。

现在您有了RDD[Outer],您可以应用任何需要的转换或操作。

// Filter the outers
outers.filter(_.inners.nonEmpty)
// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))

请注意,我们使用spark SQL库只是为了读取镶木地板文件。例如,在将DataFrame映射到RDD之前,您可以直接在DataFrame上只选择所需的列。

dataFrame.select('col1, 'col2).map { row => ... }

我将给出一个基于Python的答案,因为这就是我正在使用的。我认为Scala也有类似的东西。

根据Python API文档,Spark 1.4.0中添加了explode函数,用于处理DataFrames中的嵌套数组。

创建测试数据帧:

from pyspark.sql import Row
df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
df.show()
## +-+--------------------+
## |a|             intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+

使用explode使列表列变平:

from pyspark.sql.functions import explode
df.select(df.a, explode(df.intlist)).show()
## +-+---+
## |a|_c0|
## +-+---+
## |1|  1|
## |1|  2|
## |1|  3|
## |2|  4|
## |2|  5|
## |2|  6|
## +-+---+

另一种方法是使用这样的模式匹配:

val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
case List(a:String, b: String) => (a, b)
}).toList
})

您可以直接在Row上进行模式匹配,但由于一些原因,它可能会失败。

以上答案都是很好的答案,从不同的角度解决了这个问题;Spark SQL也是访问嵌套数据的非常有用的方法。

下面是如何在SQL中直接使用explode()查询嵌套集合的示例。

SELECT hholdid, tsp.person_seq_no 
FROM (  SELECT hholdid, explode(tsp_ids) as tsp 
FROM disc_mrt.unified_fact uf
)

tsp_ids是一个嵌套的structs,它有很多属性,包括我在上面的外部查询中选择的person_seq_no。

以上内容已在Spark 2.0中进行了测试。我做了一个小测试,它在Spark 1.6中不起作用。这个问题是在Spark 2不存在的时候提出的,所以这个答案很好地增加了处理嵌套结构的可用选项列表。

还可以查看以下JIRA,以获得使用LATERAL VIEW OUTER语法查询嵌套数据的Hive兼容方式,因为Spark 2.2也支持OUTER爆炸(例如,当嵌套集合为空,但您仍然希望具有父记录中的属性时):

  • SPARK-13721:添加对LATERAL VIEW OUTER explode()的支持

通知未在explode()上解析用于SQL访问的JIRA:

  • SPARK-7549:支持嵌套字段聚合

相关内容

  • 没有找到相关文章

最新更新