列表中每个项目的Spark Join



我有一个类似的Spark数据集

+----------+-------+----+---+--------------+
|        _1|     _2|  _3| _4|            _5|
+----------+-------+----+---+--------------+
|      null|1111111|null| 15|       [98765]|
|      null|2222222|null| 16|[97008, 98765]|
|6436334664|3333333|null| 15|       [97008]|
|2356242642|4444444|null| 11|       [97008]|
+----------+-------+----+---+--------------+

其中第五列是与该行相关联的邮政编码列表。我有另一个表,每个邮政编码和相应的经度和纬度都有唯一的行。我想创建一个类似的表

+----------+-------+----+---+--------------+-----------------------------------
|        _1|     _2|  _3| _4|            _5|                                _6|
+----------+-------+----+---+--------------+----------------------------------+
|3572893528|1111111|null| 15|       [98765]| [(54.12,-80.53)]                 |
|5325232523|2222222|null| 16|[98765, 97008]| [(54.12,-80.53), (44.12,-75.11)] |
|6436334664|3333333|null| 15|       [97008]| [(54.12,-80.53)]                 | 
|2356242642|4444444|null| 11|       [97008]| [(54.12,-80.53)]                 |
+----------+-------+----+---+--------------+----------------------------------+

其中第六列是第五列的序列中的zip的坐标。

每次我需要坐标时,我都试图过滤邮政编码表,但我得到了一个NPE,我想是因为这个问题中详细说明的类似原因。如果我试图在过滤之前收集邮政编码表,我就会耗尽内存。

我使用的是Scala,我在Spark作业中使用Spark SQL获得了原始数据集。任何解决方案都将不胜感激,谢谢。

让我们假设(对您的问题的评论成立(我们有两个数据集(简化您的示例(,分别是dsds2

+---+--------------+
|_1 |_2            |
+---+--------------+
|15 |[98765]       |
|16 |[97008, 98765]|
|15 |[97008]       |
|15 |[97008]       |
+---+--------------+
+-----+---------------+
|_2   |_3             |
+-----+---------------+
|98765|{54.12, -80.53}|
|97008|{44.12, -75.11}|
+-----+---------------+

这个想法是创建一个唯一的ID(这样我们以后可以加入(,explode数据集,然后join来获得每个唯一ID的坐标,最后再次加入表。

创建唯一ID:

ds = ds.withColumn("id", monotonically_increasing_id())

然后创建包含id和邮政编码的映射表:

val map = ds
.withColumn("_2", explode(col("_2")))
.join(ds2, Seq("_2"), "left")
.groupBy("id").agg(collect_set(col("_3")))

最后回到主表:

ds = ds.join(map, Seq("id"))

最终输出:

+---+--------------+----------------------------------+
|_1 |_2            |collect_set(_3)                   |
+---+--------------+----------------------------------+
|15 |[98765]       |[{54.12, -80.53}]                 |
|16 |[97008, 98765]|[{54.12, -80.53}, {44.12, -75.11}]|
|15 |[97008]       |[{44.12, -75.11}]                 |
|15 |[97008]       |[{44.12, -75.11}]                 |
+---+--------------+----------------------------------+

祝你好运!

最新更新