如何过滤掉spark/sql中所有重复的电话号码



输入表:

IDtbody> <<tr>245
电话号码
1202 - 555 0188:202 - 555 - 0132
202 - 555 0104:202 - 555 - 0132
3202 - 555 0104:202 - 555 - 0162
202 - 555 0181:202 - 555 0188:202 - 556 - 7191
518-555-0110

从字符串创建一个数组,重复数据删除并重新创建字符串(如果需要):

import spark.implicits._
import org.apache.spark.sql.functions._
val data = Seq((1,"202-555-0188:202-555-0132"),(2,"202-555-0104:202-555-0132"))
val df=sc.parallelize(data).toDF("id","phone")
val df2=df.withColumn("ps",split($"phone",":"))
val df3=df2.withColumn("p",explode ($"ps")).drop($"ps").drop($"phone")
val df4=df3.dropDuplicates("p")
val df5=df4.groupBy($"id").agg(collect_list($"p").as("ps"))
df5.withColumn("phones",array_join($"ps",":")).drop($"ps").show(10,false)
+---+-------------------------+
|id |phones                   |
+---+-------------------------+
|1  |202-555-0188:202-555-0132|
|2  |202-555-0104             |
+---+-------------------------+

考虑到Spark Dataset API,用case class定义模式总是更好。

val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
import spark.implicits._
case class User(id: Int, phone_no: String)
val userDS = Seq[User](User(1,"202-555-0188:202-555-0132"),User(2,"202-555-0104:202-555-0132"),User(3,"202-555-0104:202-555-0162"),User(4, "202-555-0181:202-555-0188:202-556-7191"),User(5, "518-555-0110")).toDS
val parsedDS = userDS.map{user => (user.id, user.phone_no.split(":"))}
val outDS = parsedDS.select($"_1".as("ID"), explode($"_2")
.as("PhoneNo")).dropDuplicates("PhoneNo")
.groupBy("ID").agg(concat_ws(":",collect_set($"PhoneNo")).as("Phone Number"))
.orderBy("ID")
outDS.show(false)

+---+-------------------------+
|ID |Phone Number             |
+---+-------------------------+
|1  |202-555-0132:202-555-0188|
|2  |202-555-0104             |
|3  |202-555-0162             |
|4  |202-555-0181:202-556-7191|
|5  |518-555-0110             |
+---+-------------------------+

最新更新