我有一个input.txt文件。数据如下所示。
1 1383260400000 0 0.08136262351125882
1 1383260400000 39 0.14186425470242922 0.1567870050390246 0.16093793691701822 0.052274848528573205 11.028366381681026
1 1383261000000 0 0.13658782275823106 0.02730046487718618
1 1383261000000 33 0.026137424264286602
2241 1383324600000 0 0.16869936142032646
2241 1383324600000 39 0.820500491400199 0.6518011299798726 1.658248219576473 3.4506242774863045 36.71096470849049
2241 1383324600000 49 0.16295028249496815
假设第一列是ID,其他列分别为Col1,Col2,Col3,Col4,Col5,Col6和Col7。我希望为每个ID找到COL7的平均值。基本上我想要我的结果,ID,Col7格式的AVG。
这是我到目前为止尝试过的代码。我在TXT文件中读取我的数据。然后我创建了一个模式。
val schema = StructType(Seq(
StructField("ID", IntegerType, true),
StructField("col1", DoubleType, true),
StructField("col2", IntegerType, true),
StructField("col3", DoubleType, true),
StructField("col4", DoubleType, true),
StructField("col5", DoubleType, true),
StructField("col6", DoubleType, true),
StructField("col7", DoubleType, true)
))
然后我创建了一个数据框。
val data = text.map(line => line.split("\t")).map(arr => Row.fromSeq(Seq(arr(0).toInt,Try(arr(1).asInstanceOf[DoubleType]) getOrElse(0.0),Try(arr(2).toInt) getOrElse(0),Try(arr(3).toDouble) getOrElse(0.0),Try(arr(4).toDouble) getOrElse(0.0),Try(arr(5).toDouble) getOrElse(0.0),Try(arr(6).toDouble) getOrElse(0.0),Try(arr(7).asInstanceOf[DoubleType]) getOrElse(0.0))))
最终保存在txt文件中。
val res1 = df.groupBy("ID").agg(avg("col7"))
res1.rdd.saveAsTextFile("/stuaverage/spoutput12")
当我运行此功能时,我会收到几个文件,其中包含空白结果。例如
[1068,0.0]
[1198,0.0]
[1344,0.0]
[1404,0.0]
[1537,0.0]
[1675,0.0]
[1924,0.0]
[193,0.0]
[211,0.0]
[2200,0.0]
[2225,0.0]
[2663,0.0]
[2888,0.0]
[3152,0.0]
[3235,0.0]
第一列是正确的。但是对于第二列,我应该得到一个值。(尽管某些行缺少值)
请帮忙。
问题是您以错误的方式转换col7
,尝试将其施放到DoubleType
,而不是将其解析为Scala Double
(使用.toDouble
)。您的演员阵容总是会引发例外,因此col7
始终为0.0。这有效:
val rdd = sqlContext.textFile("input.txt")
.map(line => line.split("\t"))
.map((arr: Array[String]) => Row(
arr(0).toInt,
Try(arr(1).toDouble) getOrElse (0.0),
Try(arr(2).toInt) getOrElse (0),
Try(arr(3).toDouble) getOrElse (0.0),
Try(arr(4).toDouble) getOrElse (0.0),
Try(arr(5).toDouble) getOrElse (0.0),
Try(arr(6).toDouble) getOrElse (0.0),
Try(arr(7).toDouble) getOrElse (0.0)
)
)
我建议您使用SQLContext API并使用已定义的模式
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("delimiter", "\t")
.schema(schema)
.load("path to your text file")
模式是
val schema = StructType(Seq(
StructField("ID", IntegerType, true),
StructField("col1", DoubleType, true),
StructField("col2", IntegerType, true),
StructField("col3", DoubleType, true),
StructField("col4", DoubleType, true),
StructField("col5", DoubleType, true),
StructField("col6", DoubleType, true),
StructField("col7", DoubleType, true)
))
之后,您所需要的只是在分组的dataframe
上应用avg
功能为
import org.apache.spark.sql.functions._
val res1 = df.groupBy("ID").agg(avg("col1"),avg("col2"),avg("col3"),avg("col4"),avg("col5"),avg("col6"),avg("col7"))
最后,您可以直接从dataframe
保存到csv
。您不需要转换为rdd
res1.coalesce(1).write.csv("/stuaverage/spoutput12")
尝试此更简洁的版本(假设您从Spark-Shell起作用)。它应该起作用。
val df = spark
.read
.option("header","false")
.option("sep","t")
.option("inferSchema","true")
.csv("...input...")
.toDF("ID","col1","col2","col3","col4","col5","col6","col7")
val result = df.groupBy("ID").mean("col7")
result
.write
.option("header","true")
.option("sep",";")
.csv("...output...")