Spark 使用类将 rdd 转换为数据帧



我正在使用Spark 1.6和Scala。 我在这里寻找,但没有找到明确的答案 我有一个大文件,在过滤包含一些版权的第一行后,我想获取标题(104 个字段)并将其转换为StructType架构。 我想使用一个类扩展Product特征来定义Dataframe的架构,然后根据该模式将其转换为Dataframe

最好的方法是什么。

这是我文件中的示例:

text  (06.07.03.216)  COPYRIGHT © skdjh 2000-2016
text  160614_54554.vf Database    53643_csc   Interface   574 zn  65
Start   Date    14/06/2016  00:00:00:000
End Date    14/06/2016  00:14:59:999
State   "s23"
cin. Nb      Start     End        Event       Con. Duration  IMSI
32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx

不想像这个模式一样将其转换为SparkSQL

----------------------------------------------------------------------------------------
|    cin_Nb |  Start            |   End          |      Event   |   Con_Duration  | IMSI  |
| ----------------------------------------------------------------------------------------|
|   32055680 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx |
|   32055680 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx |
|   32055680 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx |
|   20556800 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx |
|   32055680 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx | 
----------------------------------------------------------------------------------------

不幸的是,您不能使用案例类或 StructType 架构!原因是 scala 不支持超过 22 个部分的元组,这两种方法都在幕后使用元组。由于您的列超过 22 列,因此该方法不起作用。

但是,您仍然可以这样做,只是没有那么好:)您需要做的是将其转换为单列数据帧,并将该列称为有意义的名称,例如"raw">

val df = rdd.toDF("raw")

接下来,您需要定义一个函数来为任何给定列提取所需的列:

val extractData(idx: Long) = udf[String, String, Int](raw => ???)

现在,您需要使用此函数附加所需的每列。

val columns = yourColumnNamesList.zipWithIndex
val df2 = columns.foldLeft(df){case (acc,(cname,cid)) => acc.withColumn(cname, extractData(cid)($"raw")}

虽然 foldLeft 看起来有点可怕,但如果您查看执行计划器创建的计划,spark 足够聪明,可以将所有这些扁平化为单个映射步骤,并且吞吐量比您预期的要好。

最后,您可以删除原始数据,因为它不再需要了。

df2.drop("raw")

或者!

如果你的数据在文件系统上是分隔格式的,你应该看看DataBricks csv解析器,它也适用于1.6:-)

您可以使用zipwithindex,然后过滤第一行 您可以使用类来检查标头

class convert( var  cin_Nb:String, var start:String, var end:String, 
var event:String, var duration:String, var zed:String,......)extends Product  with Serializable {    
def canEqual(that: Any) = that.isInstanceOf[convert]    
def productArity = 104   
def productElement(idx: Int) = idx match {
case 0 => cin_Nb;case 1 =>  start;case 2 =>  end;
case 3 =>  event;case 4 =>  duration;case 5 =>  zed;
..........
}
}

并使用此结构将 RDD 转换为数据帧

相关内容

  • 没有找到相关文章

最新更新