def toDouble(s: String) = {
if ("?".equals(s)) Double.NaN else s.toDouble
}
def parse(line: String) = {
val pieces = line.split(',')
val id1 = pieces(0).toInt
val scores = pieces.slice(2, 11).map(toDouble)
val matched = pieces(11).toBoolean
MatchData(id1, scores, matched)
}
case class MatchData
(
id1: Int,
scores: Array[Double],
matched: Boolean
)
val inputrdd = spark.sparkContext.textFile("../donation/block_*.csv")
val noheader = inputrdd.filter(x => !x.contains("id_1"))
val df= noheader.map(line => parse(line)).toDF()
数据框架的架构如下
root
|-- id1: integer (nullable = true)
|-- scores: array (nullable = true)
|-- element: double (containsNull = false)
|-- matched: boolean (nullable = true)
前三个记录如下
[53113,WrappedArray(0.833333333333333, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 0.0),true]
[47614,WrappedArray(1.0, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0),true]
[70237,WrappedArray(1.0, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0),true]
我想获取诸如count,Mean,Max,最小的摘要统计信息。
中的每个元素。我的想法是通过滤除NAN值并给出列别名作为数组元素的索引来创建另一个数据框。在诸如count(),min()max()等函数的数据框架上使用a选择。但并非没有任何结果。
val dfnona = (0 until 9).map(i => {
df.select("scores").as[Seq[Double]].filter(s=> s(i) !=Double.NaN).alias(i.toString())
})
dfnona.select(count("0"),mean("0"), stddev_pop ("0"),max("0"),min("0")).show()
有人可以给我一些有关如何实现这一目标的指示。
如果scores
数组为整个DataSet
固定长度,则可以使用此解决方案。
val df = ... //create dataframe with schema you mentioned
//Here $"scores"(0) fetches first element in Scores Array
val subjects = df.withColumn("sub1", $"scores"(0))
.withColumn("sub2", $"scores"(1))
.withColumn("sub3", $"scores"(2))
.withColumn("sub4", $"scores"(3))
.select("sub1", "sub2", "sub3", "sub4")
// Alternate approach
val numberOfSubjects = 4
val subjects = (0 until numberOfSubjects).foldLeft(df)((accDf, index) => {
accDf.withColumn(s"sub${index}", $"scores" (index))
})
subjects.printSchema()
root
|-- sub1: double (nullable = true)
|-- sub2: double (nullable = true)
|-- sub3: double (nullable = true)
|-- sub4: double (nullable = true)
现在您可以在sub1, sub2, sub3, sub4
列上应用所有统计功能。
这是我提出的代码。
这篇文章有所帮助。和上面的mrsrinivas。
val dfs = (1 until 10).foldLeft(parsed) {case (df , i ) => (df.withColumn("score"+i.toString(), $"scores"(i)))}
val scores = dfs.drop("scores")
for (x<- scores.columns.filter(colname=> colname.contains("score")))
{
dfs.filter(!isnan(dfs.col(x))).describe(x).show()
}