输入表:
电话号码 | 1 | 202 - 555 0188:202 - 555 - 0132 | 2
---|---|
202 - 555 0104:202 - 555 - 0132 | |
3 | 202 - 555 0104:202 - 555 - 0162 |
202 - 555 0181:202 - 555 0188:202 - 556 - 7191 | |
518-555-0110 |
从字符串创建一个数组,重复数据删除并重新创建字符串(如果需要):
import spark.implicits._
import org.apache.spark.sql.functions._
val data = Seq((1,"202-555-0188:202-555-0132"),(2,"202-555-0104:202-555-0132"))
val df=sc.parallelize(data).toDF("id","phone")
val df2=df.withColumn("ps",split($"phone",":"))
val df3=df2.withColumn("p",explode ($"ps")).drop($"ps").drop($"phone")
val df4=df3.dropDuplicates("p")
val df5=df4.groupBy($"id").agg(collect_list($"p").as("ps"))
df5.withColumn("phones",array_join($"ps",":")).drop($"ps").show(10,false)
+---+-------------------------+
|id |phones |
+---+-------------------------+
|1 |202-555-0188:202-555-0132|
|2 |202-555-0104 |
+---+-------------------------+
考虑到Spark Dataset API,用case class定义模式总是更好。
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
import spark.implicits._
case class User(id: Int, phone_no: String)
val userDS = Seq[User](User(1,"202-555-0188:202-555-0132"),User(2,"202-555-0104:202-555-0132"),User(3,"202-555-0104:202-555-0162"),User(4, "202-555-0181:202-555-0188:202-556-7191"),User(5, "518-555-0110")).toDS
val parsedDS = userDS.map{user => (user.id, user.phone_no.split(":"))}
val outDS = parsedDS.select($"_1".as("ID"), explode($"_2")
.as("PhoneNo")).dropDuplicates("PhoneNo")
.groupBy("ID").agg(concat_ws(":",collect_set($"PhoneNo")).as("Phone Number"))
.orderBy("ID")
outDS.show(false)
+---+-------------------------+
|ID |Phone Number |
+---+-------------------------+
|1 |202-555-0132:202-555-0188|
|2 |202-555-0104 |
|3 |202-555-0162 |
|4 |202-555-0181:202-556-7191|
|5 |518-555-0110 |
+---+-------------------------+