火花 :基于列聚合



我有一个由 3 个字段组成的文件(Emp_ids、组、薪水)

  • 100 安培 430
  • 101 安培 500
  • 201 字节 300

我想得到结果

1) 组名和计数(*)

2) 集团名称及最高(工资)

val myfile = "/home/hduser/ScalaDemo/Salary.txt"
val conf = new SparkConf().setAppName("Salary").setMaster("local[2]")
val sc=  new SparkContext( conf)
val sal= sc.textFile(myfile) 

Scala DSL:

case class Data(empId: Int, group: String, salary: Int)
val df = sqlContext.createDataFrame(lst.map {v =>
   val arr = v.split(' ').map(_.trim())
   Data(arr(0).toInt, arr(1), arr(2).toInt)
  })
df.show()
+-----+-----+------+
|empId|group|salary|
+-----+-----+------+
|  100|    A|   430|
|  101|    A|   500|
|  201|    B|   300|
+-----+-----+------+
df.groupBy($"group").agg(count("*") as "count").show()
+-----+-----+
|group|count|
+-----+-----+
|    A|    2|
|    B|    1|
+-----+-----+

df.groupBy($"group").agg(max($"salary") as "maxSalary").show()
+-----+---------+
|group|maxSalary|
+-----+---------+
|    A|      500|
|    B|      300|
+-----+---------+

或者使用纯 SQL:

df.registerTempTable("salaries")
sqlContext.sql("select group, count(*) as count from salaries group by group").show()
+-----+-----+
|group|count|
+-----+-----+
|    A|    2|
|    B|    1|
+-----+-----+
sqlContext.sql("select group, max(salary) as maxSalary from salaries group by group").show()
+-----+---------+
|group|maxSalary|
+-----+---------+
|    A|      500|
|    B|      300|
+-----+---------+

虽然出于性能原因,建议使用 Spark SQL 进行此类聚合,但使用 RDD API 可以轻松完成:

val rdd = sc.parallelize(Seq(Data(100, "A", 430), Data(101, "A", 500), Data(201, "B", 300)))
rdd.map(v => (v.group, 1)).reduceByKey(_ + _).collect()
res0: Array[(String, Int)] = Array((B,1), (A,2))
rdd.map(v => (v.group, v.salary)).reduceByKey((s1, s2) => if (s1 > s2) s1 else s2).collect()
res1: Array[(String, Int)] = Array((B,300), (A,500))

相关内容

  • 没有找到相关文章

最新更新