加入两个数据框架,并使用另一个数据框架记录更新一个数据框



所以我有两个数据框架。像这样的数据框架1:

+----------+------+---------+--------+------+
|     OrgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|   136|        4|       1|  I|!||
|4295877346|   136|        4|       1|  I|!||
|4295877341|   138|        2|       1|  I|!||
|4295877341|   141|        4|       1|  I|!||
|4295877341|   143|        2|       1|  I|!||
|4295877341|   145|       14|       1|  I|!||
| 123456789|   145|       14|       1|  I|!||
| 809580109|   145|       9|        9|  I|!||
+----------+------+---------+--------+------+

dataFrame2如下

+----------+------+-----------+----------+--------+
|     OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877343|   149|         15|         2|    I|!||
|4295877341|   136|       null|      null|    I|!||
| 123456789|   145|         14|         1|    D|!||
|4295877341|   138|         11|        22|    I|!||
|4295877341|   141|         10|         1|    I|!||
|4295877341|   143|          1|         1|    I|!||
| 809580109|   145|       NULL|      NULL|    I|!||
+----------+------+-----------+----------+--------+

现在,我必须加入两个数据框架更新数据框架1列,其中匹配记录与数据框架2。

现在两个数据框中的关键是构成氧化和itemID。

因此,预期的输出应该是。

+----------+------+---------+--------+------+
|     OrgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346|   136|        4|       1|  I|!||
|4295877341|   145|       14|       1|  I|!||
|4295877343|   149|       15|       2|  I|!||
|4295877341|   136|     null|    null|  I|!||
|4295877341|   138|       11|      22|  I|!||
|4295877341|   141|       10|       1|  I|!||
|4295877341|   143|        1|       1|  I|!||
| 809580109|   145|       9|        9|  I|!||
+----------+------+---------+--------+------+

因此,我需要使用数据框架2记录更新数据框架1。如果数据框架1中的记录在2中找不到,则我们还需要保留该记录。如果在DataFrame 2中找到了任何新记录,则需要在输出中添加记录

这是我在做的..

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer")
  .select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1")
  .filter(!$"Action_1".contains("D"))
    df3.show()

,但我要低于输出。

+----------+------+-----------+----------+--------+
|     OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877343|   149|         15|         2|    I|!||
|4295877341|   136|       null|      null|    I|!||
|4295877341|   138|         11|        22|    I|!||
|4295877341|   141|         10|         1|    I|!||
|4295877341|   143|          1|         1|    I|!||
+----------+------+-----------+----------+--------+

我没有从数据框架1 ...

获得4295877346| 136| 4| 1| I|!|记录

left_outer给我以下输出

+----------+------+-----------+----------+--------+
|     OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877341|   136|       null|      null|    I|!||
|4295877341|   138|         11|        22|    I|!||
|4295877341|   141|         10|         1|    I|!||
|4295877341|   143|          1|         1|    I|!||
+----------+------+-----------+----------+--------+

让我首先解释一下你的错误。

如果您仅加入以下

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer")
df3.show()

您会得到

+----------+------+---------+--------+------+-----------+----------+--------+
|     OrgId|ItemId|segmentId|Sequence|Action|segmentId_1|Sequence_1|Action_1|
+----------+------+---------+--------+------+-----------+----------+--------+
|4295877346|   136|        4|       1|  I|!||       null|      null|    null|
|4295877341|   145|       14|       1|  I|!||       null|      null|    null|
|4295877343|   149|     null|    null|  null|         15|         2|    I|!||
|4295877341|   136|        4|       1|  I|!||       null|      null|    I|!||
| 123456789|   145|       14|       1|  I|!||         14|         1|    D|!||
|4295877341|   138|        2|       1|  I|!||         11|        22|    I|!||
|4295877341|   141|        4|       1|  I|!||         10|         1|    I|!||
|4295877341|   143|        2|       1|  I|!||          1|         1|    I|!||
+----------+------+---------+--------+------+-----------+----------+--------+

很明显您的代码中的filterAction_1列中也过滤null

因此,为您提供的工作代码是将join之后获得的null值更改为来自存在数据的其他表的有效数据。

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer")
  .withColumn("segmentId_1", when($"segmentId_1".isNotNull, $"segmentId_1").otherwise($"segmentId"))
  .withColumn("Sequence_1", when($"Sequence_1".isNotNull, $"Sequence_1").otherwise($"Sequence"))
  .withColumn("Action_1", when($"Action_1".isNotNull, $"Action_1").otherwise($"Action"))
  .select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1")
  .filter(!$"Action_1".contains("D") )
df3.show()

您应该将所需的输出作为

获得
+----------+------+-----------+----------+--------+
|     OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877346|   136|          4|         1|    I|!||
|4295877341|   145|         14|         1|    I|!||
|4295877343|   149|         15|         2|    I|!||
|4295877341|   136|       null|      null|    I|!||
|4295877341|   138|         11|        22|    I|!||
|4295877341|   141|         10|         1|    I|!||
|4295877341|   143|          1|         1|    I|!||
+----------+------+-----------+----------+--------+

尝试左outer而不是外部:

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "left_outer")
  .select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1")
  .filter(!$"Action_1".contains("D"))
    df3.show()

左外部应保留所有不匹配的左侧。

这里一个不错的教程。

相关内容

  • 没有找到相关文章

最新更新