我正在读取一个有很多空格的文件,需要过滤掉空间。之后,我们需要将其转换为数据帧。示例输入如下。
2017123 ¦ ¦10¦running¦00000¦111¦-EXAMPLE
我的解决方案是以下函数,它解析出所有空格并修剪文件。
def truncateRDD(fileName : String): RDD[String] = {
val example = sc.textFile(fileName)
example.map(lines => lines.replaceAll("""[tp{Zs}]+""", ""))
}
但是,我不确定如何将其放入数据帧中。sc.textFile
返回一个RDD[String]
。我尝试了案例类的方式,但问题是我们有 800 个字段模式,案例类不能超过 22 个。
我正在考虑以某种方式将RDD[String]转换为RDD[Row],以便我可以使用createDataFrame
函数。
val DF = spark.createDataFrame(rowRDD, schema)
关于如何做到这一点的任何建议?
首先将字符串拆分/解析到字段中。
rdd.map( line => parse(line))
,解析是一些解析函数。 它可以像拆分一样简单,但您可能需要更强大的东西。 这将使您获得RDD[Array[String]]
或类似内容。
然后,您可以使用rdd.map(a => Row.fromSeq(a))
转换为RDD[Row]
从那里你可以转换为数据帧sqlContext.createDataFrame(rdd, schema)
其中rdd是你的RDD[Row]
,模式是你的模式结构类型。
在您的情况下,简单的方法:
val RowOfRDD = truncateRDD("yourfilename").map(r => Row.fromSeq(r))
如果您使用的是 scala 2.10productarity
如何解决这个问题?
但是,我不确定如何将其放入数据帧中。
sc.textFile
返回一个 RDD[字符串]。我尝试了案例类的方式,但问题是我们 有 800 个字段架构,案例类不能超过 22 个。
是的,有一些限制,比如productarity
但我们可以克服...... 您可以像以下示例一样<版本2.11:>
准备一个extends Product
和重写方法的案例类。
喜欢。。。
-
productArity():Int:
这将返回属性的大小。在我们的例子中,它是33。因此,我们的实现如下所示: -
productElement(n:Int):Any:
给定一个索引,这将返回属性。作为保护,我们还有一个默认情况,它会引发一个IndexOutOfBoundsException
异常: -
canEqual (that:Any):Boolean
:这是三个函数中的最后一个,当对类进行相等性检查时,它用作边界条件:
- 示例实现,您可以参考此学生案例类,其中包含 33 个字段
- 此处为示例学生数据集说明