我发布了以下程序,将输出转换为与程序最后一节中的方法不同的格式:
org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object PeTPairing {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("PET").master("local").getOrCreate()
import spark.implicits._
val pet = spark.read.option("header", true).option("inferSchema", true).csv("data/other/pet.csv")
val owners = spark.read.option("header", true).option("inferSchema", true).csv("data/other/owner.csv")
val pet_data = pet.select("id", "owner_id", "breed", "sex")
val animalPair = pet_data.as("a1")
.join(pet_data.as("a2"), $"a1.breed" === $"a2.breed" && $"a1.sex" =!= $"a2.sex" && $"a1.owner_id" =!= $"a2.owner_id").
join(owners.as("a3"), $"a1.owner_id" === $"a3.id", "full_outer").select($"a1.breed".alias("breed1"),$"a2.breed".alias("breed2"),$"a1.sex".alias("sex1"),$"a1.owner_id".alias("owner1"),$"a2.owner_id".alias("owner1"),$"a2.breed".alias("breed2"),$"a2.sex".alias("sex2"),$"a3.id".alias("owner_id"))
//animalPair.show()
val finalOutput= animalPair.groupBy("owner_id").agg(count("sex2").alias("count")).sort(col("count").desc, col("owner_id"))
finalOutput.show()
上述程序的当前输出为:
1. +--------+-----+
2. |owner_id|count|
3. +--------+-----+
4. | 1| 9|
5. | 6| 6|
6. | 21| 4|
7. | 2| 3|
8. | 8| 3|
9. | 13| 3|
10. | 20| 3|
11. | 14| 2|
12. | 3| 0|
13. | 4| 0|
14. | 7| 0|
15. | 10| 0|
16. | 11| 0|
17. +--------+-----+
但我想把上面的输出变成下面的一个:
1. owner_id,count
2. 1,9
3. 6,6
4. 21,4
5. 2,3
6. 8,3
7. 13,3
8. 20,3
9. 14,2
10. 4,1
11. 3,0
12. 7,0
13. 10,0
14. 11,0
我找到了解决方案
print("owner_id,countn")
finalOutput.rdd.map(_.mkString(",")).collect.foreach(println)