所以我有两个数据框架。像这样的数据框架1:
+----------+------+---------+--------+------+
| OrgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341| 136| 4| 1| I|!||
|4295877346| 136| 4| 1| I|!||
|4295877341| 138| 2| 1| I|!||
|4295877341| 141| 4| 1| I|!||
|4295877341| 143| 2| 1| I|!||
|4295877341| 145| 14| 1| I|!||
| 123456789| 145| 14| 1| I|!||
| 809580109| 145| 9| 9| I|!||
+----------+------+---------+--------+------+
dataFrame2如下
+----------+------+-----------+----------+--------+
| OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877343| 149| 15| 2| I|!||
|4295877341| 136| null| null| I|!||
| 123456789| 145| 14| 1| D|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
| 809580109| 145| NULL| NULL| I|!||
+----------+------+-----------+----------+--------+
现在,我必须加入两个数据框架更新数据框架1列,其中匹配记录与数据框架2。
现在两个数据框中的关键是构成氧化和itemID。
因此,预期的输出应该是。
+----------+------+---------+--------+------+
| OrgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346| 136| 4| 1| I|!||
|4295877341| 145| 14| 1| I|!||
|4295877343| 149| 15| 2| I|!||
|4295877341| 136| null| null| I|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
| 809580109| 145| 9| 9| I|!||
+----------+------+---------+--------+------+
因此,我需要使用数据框架2记录更新数据框架1。如果数据框架1中的记录在2中找不到,则我们还需要保留该记录。如果在DataFrame 2中找到了任何新记录,则需要在输出中添加记录
这是我在做的..
val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer")
.select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1")
.filter(!$"Action_1".contains("D"))
df3.show()
,但我要低于输出。
+----------+------+-----------+----------+--------+
| OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877343| 149| 15| 2| I|!||
|4295877341| 136| null| null| I|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
+----------+------+-----------+----------+--------+
我没有从数据框架1 ...
获得4295877346| 136| 4| 1| I|!|
记录left_outer给我以下输出
+----------+------+-----------+----------+--------+
| OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877341| 136| null| null| I|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
+----------+------+-----------+----------+--------+
让我首先解释一下你的错误。
如果您仅加入以下
val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer")
df3.show()
您会得到
+----------+------+---------+--------+------+-----------+----------+--------+
| OrgId|ItemId|segmentId|Sequence|Action|segmentId_1|Sequence_1|Action_1|
+----------+------+---------+--------+------+-----------+----------+--------+
|4295877346| 136| 4| 1| I|!|| null| null| null|
|4295877341| 145| 14| 1| I|!|| null| null| null|
|4295877343| 149| null| null| null| 15| 2| I|!||
|4295877341| 136| 4| 1| I|!|| null| null| I|!||
| 123456789| 145| 14| 1| I|!|| 14| 1| D|!||
|4295877341| 138| 2| 1| I|!|| 11| 22| I|!||
|4295877341| 141| 4| 1| I|!|| 10| 1| I|!||
|4295877341| 143| 2| 1| I|!|| 1| 1| I|!||
+----------+------+---------+--------+------+-----------+----------+--------+
很明显您的代码中的filter
在Action_1
列中也过滤null
因此,为您提供的工作代码是将join
之后获得的null
值更改为来自存在数据的其他表的有效数据。
val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer")
.withColumn("segmentId_1", when($"segmentId_1".isNotNull, $"segmentId_1").otherwise($"segmentId"))
.withColumn("Sequence_1", when($"Sequence_1".isNotNull, $"Sequence_1").otherwise($"Sequence"))
.withColumn("Action_1", when($"Action_1".isNotNull, $"Action_1").otherwise($"Action"))
.select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1")
.filter(!$"Action_1".contains("D") )
df3.show()
您应该将所需的输出作为
获得+----------+------+-----------+----------+--------+
| OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877346| 136| 4| 1| I|!||
|4295877341| 145| 14| 1| I|!||
|4295877343| 149| 15| 2| I|!||
|4295877341| 136| null| null| I|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
+----------+------+-----------+----------+--------+
尝试左outer而不是外部:
val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "left_outer")
.select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1")
.filter(!$"Action_1".contains("D"))
df3.show()
左外部应保留所有不匹配的左侧。
这里一个不错的教程。