我需要基于某些字段的等价或相似性来做两个数据集的记录链接。例如,假设数据集看起来像这样(带有一些随机数据):
:
<表类>
A_ID
FirstName
LastName
生日
地址
0
Vera
Williams
12.03.1999
Colorado, Greeley, 3774 Stark Hollow Road
1约瑟夫道明> 11.10.1988 佛罗里达,Deltona,威利斯大街4577号
表类>
你可以连接你的两个数据帧
最有效的方法是在数据框A
中创建一些列,以便仅使用列相等条件作为连接条件,这将防止Spark在连接两个数据框时退回到非常低效的笛卡尔积。你可以这样做:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.Arrays;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.concat_ws;
import static org.apache.spark.sql.functions.element_at;
import static org.apache.spark.sql.functions.split;
import scala.collection.JavaConverters;
...
Dataset<Row> outputDataframe = dataframeA
.withColumn("FullName", concat_ws(" ", col("FirstName"), col("LastName")))
.withColumn("Street", element_at(split(col("Address"), ", "), -1))
.join(dataframeB, JavaConverters.asScalaBuffer(Arrays.asList("Street", "FullName", "BirthDate")), "left_outer")
.drop("Street", "FullName");
您的示例数据框架A
:
+----+---------+--------+----------+-----------------------------------------+
|A_ID|FirstName|lastName|BirthDate |Address |
+----+---------+--------+----------+-----------------------------------------+
|0 |Vera |Williams|12.03.1999|Colorado, Greeley, 3774 Stark Hollow Road|
|1 |Joseph |Peters |11.10.1988|Florida, Deltona, 4577 Willis Avenue |
+----+---------+--------+----------+-----------------------------------------+
和数据框B
:
+----+----------------+----------+------------------+
|B_ID|FullName |BirthDate |Street |
+----+----------------+----------+------------------+
|37 |Joseph Peters |11.10.1988|4577 Willis Avenue|
|49 |Valerie J Porter|17.01.2000|2114 Center Street|
+----+----------------+----------+------------------+
您将得到以下output
数据帧:
+----------+----+---------+--------+-----------------------------------------+----+
|BirthDate |A_ID|FirstName|lastName|Address |B_ID|
+----------+----+---------+--------+-----------------------------------------+----+
|12.03.1999|0 |Vera |Williams|Colorado, Greeley, 3774 Stark Hollow Road|null|
|11.10.1988|1 |Joseph |Peters |Florida, Deltona, 4577 Willis Avenue |37 |
+----------+----+---------+--------+-----------------------------------------+----+
注意:如果您不能轻松地从数据框
A
中提取精确匹配的数据,您可以使用Egor的解决方案。但是,您可能会遇到性能问题,因为Spark将执行笛卡尔积。
我没有用你的数据测试这段代码,但希望它能工作。在芬兰湾的科特林:
val datasetA: Dataset<Row> = ...
val datasetB: Dataset<Row> = ...
val condition = datasetA.col("BirthDate").equalTo(datasetB.col("BirthDate"))
.and(datasetB.col("FullName").contains(datasetA.col("FirstName")))
.and(datasetB.col("FullName").contains(datasetA.col("LastName")))
.and(datasetB.col("Address").contains(datasetA.col("Street")))
val result = datasetA.join(featuresDF, condition)