我有一个类似的Spark数据集
+----------+-------+----+---+--------------+
| _1| _2| _3| _4| _5|
+----------+-------+----+---+--------------+
| null|1111111|null| 15| [98765]|
| null|2222222|null| 16|[97008, 98765]|
|6436334664|3333333|null| 15| [97008]|
|2356242642|4444444|null| 11| [97008]|
+----------+-------+----+---+--------------+
其中第五列是与该行相关联的邮政编码列表。我有另一个表,每个邮政编码和相应的经度和纬度都有唯一的行。我想创建一个类似的表
+----------+-------+----+---+--------------+-----------------------------------
| _1| _2| _3| _4| _5| _6|
+----------+-------+----+---+--------------+----------------------------------+
|3572893528|1111111|null| 15| [98765]| [(54.12,-80.53)] |
|5325232523|2222222|null| 16|[98765, 97008]| [(54.12,-80.53), (44.12,-75.11)] |
|6436334664|3333333|null| 15| [97008]| [(54.12,-80.53)] |
|2356242642|4444444|null| 11| [97008]| [(54.12,-80.53)] |
+----------+-------+----+---+--------------+----------------------------------+
其中第六列是第五列的序列中的zip的坐标。
每次我需要坐标时,我都试图过滤邮政编码表,但我得到了一个NPE,我想是因为这个问题中详细说明的类似原因。如果我试图在过滤之前收集邮政编码表,我就会耗尽内存。
我使用的是Scala,我在Spark作业中使用Spark SQL获得了原始数据集。任何解决方案都将不胜感激,谢谢。
让我们假设(对您的问题的评论成立(我们有两个数据集(简化您的示例(,分别是ds
和ds2
:
+---+--------------+
|_1 |_2 |
+---+--------------+
|15 |[98765] |
|16 |[97008, 98765]|
|15 |[97008] |
|15 |[97008] |
+---+--------------+
+-----+---------------+
|_2 |_3 |
+-----+---------------+
|98765|{54.12, -80.53}|
|97008|{44.12, -75.11}|
+-----+---------------+
这个想法是创建一个唯一的ID(这样我们以后可以加入(,explode
数据集,然后join
来获得每个唯一ID的坐标,最后再次加入表。
创建唯一ID:
ds = ds.withColumn("id", monotonically_increasing_id())
然后创建包含id
和邮政编码的映射表:
val map = ds
.withColumn("_2", explode(col("_2")))
.join(ds2, Seq("_2"), "left")
.groupBy("id").agg(collect_set(col("_3")))
最后回到主表:
ds = ds.join(map, Seq("id"))
最终输出:
+---+--------------+----------------------------------+
|_1 |_2 |collect_set(_3) |
+---+--------------+----------------------------------+
|15 |[98765] |[{54.12, -80.53}] |
|16 |[97008, 98765]|[{54.12, -80.53}, {44.12, -75.11}]|
|15 |[97008] |[{44.12, -75.11}] |
|15 |[97008] |[{44.12, -75.11}] |
+---+--------------+----------------------------------+
祝你好运!