我正在Apache Spark中的Scala Shell中进行实验。我有一个带有值列表的文本文件,我想找到特定列的平均值。我的input.txt文件如下所示。(这不是整个文件,而是示例。)
1 12.4 12.5 18.9 19.9
2 1.7 1.9
3 11.99 1.9 8.9 12.90978933
2 89.987 7.99 12.898980800000
1 12.8 1.88 1.8
2 1.9 1.8 1.8979 1.808888
我想在第一列中找到每列第五列的平均值。例如,假设这些是一组学生ID和标记。对于每个学生ID,我想找到最后一个主题的标记。另请注意,最后一列中缺少某些值。
这是我到目前为止尝试过的代码。
val text = sc.textFile("/neerja/input.txt")
val data = text.flatMap(line => line.split("\t")).map(word => (word,1).reduceByKey(_ + _);
我想获得最后一列并找到平均值。作为第一步,我想到了在最后一列中获取所有值。
val fourth = text.map(_.split("\t")(4)).collect
但这给了我ArrayIndexOutOfBoundException
。我怀疑发生这种情况是因为上一列中缺少某些值。请帮助我找到最后一列的平均值。任何帮助将不胜感激。
您可以简单地执行以下
val text = sc.textFile("/neerja/input.txt")
val fourth = text.map(line => line.split("\t"))
.map(arr => Try(arr(4).toDouble) getOrElse(0.0)).mean()
println(fourth)
您应该获得第五列主题的平均值
更新
如果需要所有主题列的平均值,我建议您创建dataframe
。Dataframe
s是优化的RDD
,许多内置功能可用于计算。
用于为给定的数据创建dataframe
,您将需要schema
。
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructField, StructType}
val schema = StructType(Seq(
StructField("Sn", IntegerType, true),
StructField("subject1", DoubleType, true),
StructField("subject2", DoubleType, true),
StructField("subject3", DoubleType, true),
StructField("subject4", DoubleType, true)
))
RDD[Row]
需要创建为
val data = text.map(line => line.split("\t"))
.map(arr => Row.fromSeq(Seq(arr(0).toInt, Try(arr(1).asInstanceOf[DoubleType]) getOrElse(0.0),Try(arr(2).toDouble) getOrElse(0.0),Try(arr(3).toDouble) getOrElse(0.0),Try(arr(4).toDouble) getOrElse(0.0))))
最终创建了数据框
val df = sqlContext.createDataFrame(data, schema)
每列的平均可以通过使用mean
函数作为
df.select(mean("subject1").as("averageOFS1"),mean("subject2").as("averageOFS2"),mean("subject3").as("averageOFS3"),mean("subject4").as("averageOFS4")).show(false)
应该给您dataframe
+------------------+-----------------+-----------+-----------------+
|averageOFS1 |averageOFS2 |averageOFS3|averageOFS4 |
+------------------+-----------------+-----------+-----------------+
|21.796166666666668|4.661666666666666|5.24965 |7.919609688333335|
+------------------+-----------------+-----------+-----------------+
如果您想尝试一种结构方法,则也可以使用数据框架实现此目的:
object average extends App{
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
import sparkSession.implicits._
val x = sparkSession.read
.option("header", "false")
.option("delimiter", "\t")
.option("mode", "FAILFAST")
.csv("...Spark-2.x/src/main/resources/tab_data.csv")
x.printSchema()
x.show(truncate = false)
val df: DataFrame = x.select('_c0 as "id",
'_c1 as "sub1",'_c2 as "sub2",'_c3 as "sub3",'_c4 as "sub4")
df.groupBy('id).agg(avg('sub4)).show()
}