我今天早些时候问过一个类似的问题。在这里。简短:我需要做两个大数据集(1.6M &6米)。我打算用斯帕克斯,认为我被警告过的笛卡尔积不会是一个大问题。但事实的确如此。它对性能的影响如此之大,以至于联动过程在7小时内没有完成。
是否有其他库/框架/工具更有效地做到这一点?或者改进下面解决方案的性能?
我最后写的代码:
object App {
def left(col: Column, n: Int) = {
assert(n > 0)
substring(col, 1, n)
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[4]")
.appName("MatchingApp")
.getOrCreate()
import spark.implicits._
val a = spark.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.load("/home/helveticau/workstuff/a.csv")
.withColumn("FULL_NAME", concat_ws(" ", col("FIRST_NAME"), col("LAST_NAME")))
.withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "yyyy-MM-dd"))
val b = spark.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.load("/home/helveticau/workstuff/b.txt")
.withColumn("FULL_NAME", concat_ws(" ", col("FIRST_NAME"), col("LAST_NAME")))
.withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "dd.MM.yyyy"))
// @formatter:off
val condition = a
.col("FULL_NAME").contains(b.col("FIRST_NAME"))
.and(a.col("FULL_NAME").contains(b.col("LAST_NAME")))
.and(a.col("BIRTH_DATE").equalTo(b.col("BIRTH_DATE"))
.or(a.col("STREET").startsWith(left(b.col("STR"), 3))))
// @formatter:on
val startMillis = System.currentTimeMillis();
val res = a.join(b, condition, "left_outer")
val count = res
.filter(col("B_ID").isNotNull)
.count()
println(s"Count: $count")
val executionTime = Duration.ofMillis(System.currentTimeMillis() - startMillis)
println(s"Execution time: ${executionTime.toMinutes}m")
}
}
可能情况太复杂了,但一定是这样。
您可以通过稍微改变一下执行链接的逻辑来提高当前解决方案的性能:
- 首先执行内连接
a
和b
的数据帧,其中包含您知道匹配的列。在您的情况下,它似乎是LAST_NAME
和FIRST_NAME
列。 < - ,过滤器/strong>在您的情况下,出生日期相等或街道匹配条件。
- 最后,如果你也需要保持不联系记录,执行对加入
a
数据帧
你的代码可以重写如下:
import org.apache.spark.sql.functions.{col, substring, to_date}
import org.apache.spark.sql.SparkSession
import java.time.Duration
object App {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[4]")
.appName("MatchingApp")
.getOrCreate()
val a = spark.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.load("/home/helveticau/workstuff/a.csv")
.withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "yyyy-MM-dd"))
val b = spark.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.load("/home/helveticau/workstuff/b.txt")
.withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "dd.MM.yyyy"))
val condition = a.col("BIRTH_DATE").equalTo(b.col("BIRTH_DATE"))
.or(a.col("STREET").startsWith(substring(b.col("STR"), 1, 3)))
val startMillis = System.currentTimeMillis();
val res = a.join(b, Seq("LAST_NAME", "FIRST_NAME"))
.filter(condition)
// two following lines optional if you want to only keep records with not null B_ID
.select("B_ID", "A_ID")
.join(a, Seq("A_ID"), "right_outer")
val count = res
.filter(col("B_ID").isNotNull)
.count()
println(s"Count: $count")
val executionTime = Duration.ofMillis(System.currentTimeMillis() - startMillis)
println(s"Execution time: ${executionTime.toMinutes}m")
}
}
这样你就可以避免笛卡尔积的代价是两个连接而不是只有一个连接。
例子文件a.csv
包含以下数据:
"A_ID";"FIRST_NAME";"LAST_NAME";"BIRTH_DATE";"STREET"
10;John;Doe;1965-10-21;Johnson Road
11;Rebecca;Davis;1977-02-27;Lincoln Road
12;Samantha;Johns;1954-03-31;Main Street
13;Roger;Penrose;1987-12-25;Oxford Street
14;Robert;Smith;1981-08-26;Canergie Road
15;Britney;Stark;1983-09-27;Alshire Road
和b.txt
具有以下数据:
"B_ID";"FIRST_NAME";"LAST_NAME";"BIRTH_DATE";"STR"
29;John;Doe;21.10.1965;Johnson Road
28;Rebecca;Davis;28.03.1986;Lincoln Road
27;Shirley;Iron;30.01.1956;Oak Street
26;Roger;Penrose;25.12.1987;York Street
25;Robert;Dayton;26.08.1956;Canergie Road
24;Britney;Stark;22.06.1962;Algon Road
res
dataframe将为:
+----+----+----------+---------+----------+-------------+
|A_ID|B_ID|FIRST_NAME|LAST_NAME|BIRTH_DATE|STREET |
+----+----+----------+---------+----------+-------------+
|10 |29 |John |Doe |1965-10-21|Johnson Road |
|11 |28 |Rebecca |Davis |1977-02-27|Lincoln Road |
|12 |null|Samantha |Johns |1954-03-31|Main Street |
|13 |26 |Roger |Penrose |1987-12-25|Oxford Street|
|14 |null|Robert |Smith |1981-08-26|Canergie Road|
|15 |null|Britney |Stark |1983-09-27|Alshire Road |
+----+----+----------+---------+----------+-------------+
注意:如果您的
FIRST_NAME
和LAST_NAME
列不完全相同,您可以尝试使它们与Spark的内置函数匹配,例如:
trim
删除字符串开头和结尾的空格lower
将列转换为小写(因此在比较中忽略大小写)真正重要的是要有最大数量的完全匹配的列。