我正在使用Spark 1.6和Scala。 我在这里寻找,但没有找到明确的答案 我有一个大文件,在过滤包含一些版权的第一行后,我想获取标题(104 个字段)并将其转换为StructType
架构。 我想使用一个类扩展Product
特征来定义Dataframe
的架构,然后根据该模式将其转换为Dataframe
。
最好的方法是什么。
这是我文件中的示例:
text (06.07.03.216) COPYRIGHT © skdjh 2000-2016
text 160614_54554.vf Database 53643_csc Interface 574 zn 65
Start Date 14/06/2016 00:00:00:000
End Date 14/06/2016 00:14:59:999
State "s23"
cin. Nb Start End Event Con. Duration IMSI
32055680 16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
32055680 16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
32055680 16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
32055680 16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
32055680 16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
不想像这个模式一样将其转换为SparkSQL
----------------------------------------------------------------------------------------
| cin_Nb | Start | End | Event | Con_Duration | IMSI |
| ----------------------------------------------------------------------------------------|
| 32055680 | 16/09/2010 | 16:59:59:245 | 16/09/2016 | 17:00:00:000 | xxxxx |
| 32055680 | 16/09/2010 | 16:59:59:245 | 16/09/2016 | 17:00:00:000 | xxxxx |
| 32055680 | 16/09/2010 | 16:59:59:245 | 16/09/2016 | 17:00:00:000 | xxxxx |
| 20556800 | 16/09/2010 | 16:59:59:245 | 16/09/2016 | 17:00:00:000 | xxxxx |
| 32055680 | 16/09/2010 | 16:59:59:245 | 16/09/2016 | 17:00:00:000 | xxxxx |
----------------------------------------------------------------------------------------
不幸的是,您不能使用案例类或 StructType 架构!原因是 scala 不支持超过 22 个部分的元组,这两种方法都在幕后使用元组。由于您的列超过 22 列,因此该方法不起作用。
但是,您仍然可以这样做,只是没有那么好:)您需要做的是将其转换为单列数据帧,并将该列称为有意义的名称,例如"raw">
val df = rdd.toDF("raw")
接下来,您需要定义一个函数来为任何给定列提取所需的列:
val extractData(idx: Long) = udf[String, String, Int](raw => ???)
现在,您需要使用此函数附加所需的每列。
val columns = yourColumnNamesList.zipWithIndex
val df2 = columns.foldLeft(df){case (acc,(cname,cid)) => acc.withColumn(cname, extractData(cid)($"raw")}
虽然 foldLeft 看起来有点可怕,但如果您查看执行计划器创建的计划,spark 足够聪明,可以将所有这些扁平化为单个映射步骤,并且吞吐量比您预期的要好。
最后,您可以删除原始数据,因为它不再需要了。
df2.drop("raw")
或者!
如果你的数据在文件系统上是分隔格式的,你应该看看DataBricks csv解析器,它也适用于1.6:-)
您可以使用zipwithindex,然后过滤第一行 您可以使用类来检查标头
class convert( var cin_Nb:String, var start:String, var end:String,
var event:String, var duration:String, var zed:String,......)extends Product with Serializable {
def canEqual(that: Any) = that.isInstanceOf[convert]
def productArity = 104
def productElement(idx: Int) = idx match {
case 0 => cin_Nb;case 1 => start;case 2 => end;
case 3 => event;case 4 => duration;case 5 => zed;
..........
}
}
并使用此结构将 RDD 转换为数据帧