我是Spark的新手,我有一个愚蠢的"什么是最好的方法;问题基本上,我有一个我想循环浏览的地图(dict(。在每次迭代过程中,我想使用rlike
regex搜索spark数据帧中的一列,并使用withColumn
将dict的键分配给一个新列
maps = {"groceries": ["hot chocolate", "milk", "sugar", "flour"],
"laundry": ["soap", "detergent", "fabric softener"]
}
数据样本显示在下方
+--------------------+-----------+
| id|item_bought|
+--------------------+-----------+
|uiq7Zq52Bww4pZXc3xri| Soap|
|fpJatwxTeObcbuJH25UI| Detergent|
|MdK1q5gBygIGFYyvbz8J| Milk|
+--------------------+-----------+
我想得到一个看起来像这样的数据帧:
+--------------------+-----------+---------+
| id|item_bought| class|
+--------------------+-----------+---------+
|uiq7Zq52Bww4pZXc3xri| Soap| Laundry|
|fpJatwxTeObcbuJH25UI| Detergent| Laundry|
|MdK1q5gBygIGFYyvbz8J| Milk|Groceries|
+--------------------+-----------+---------+
我有超过100M的记录,我想要一种使用Spark最佳实践(分布式计算(的方法。脑海中浮现的一种方法是循环浏览映射并使用rlike
或str.contains
进行正则表达式搜索,如下所示:
for key, value in maps.items():
pattern = '|'.join([f'(?i){x}' for x in value]). # ignore case
df.withColumn("class", col("item_bought").rlike(pattern))
但这会为类似rlike的搜索返回true
或false
。我想用key
值替换true或false。
此外,考虑到我有100M(高达150M(的记录,在地图中循环是最好的方法吗?
编辑
如果df
中的items_bought
有特殊字符(或一些额外的文本(怎么办?
+--------------------+----------------+
| id| item_bought|
+--------------------+----------------+
|uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg|
|fpJatwxTeObcbuJH25UI|Detergent x.ju2i|
|MdK1q5gBygIGFYyvbz8J| Milk|
+--------------------+----------------+
我不想先做文本清理,只需要根据regex关键字搜索分配类
根据您的情况,我将把映射转换为数据帧。我认为生成的数据帧将相对较小。使用广播连接。这样做的目的是将小df分发到每个工作节点,从而避免混洗。
#Create df from maps
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought')).withColumn('item_bought',explode('item_bought')).withColumn('item_bought', initcap('item_bought'))
#Broadcast join
df.join(broadcast(df_ref), how='left', on='item_bought').show()
+-----------+--------------------+---------+
|item_bought| id| class|
+-----------+--------------------+---------+
| Soap|uiq7Zq52Bww4pZXc3xri| laundry|
| Detergent|fpJatwxTeObcbuJH25UI| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J|groceries|
+-----------+--------------------+---------+
按照您的编辑
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought1')).withColumn('item_bought1',explode('item_bought1')).withColumn('item_bought1', initcap('item_bought1'))
df.withColumn('item_bought1',regexp_extract('item_bought','^[A-Za-z]+',0)).join(broadcast(df_ref), how='left', on='item_bought1').show()
+------------+--------------------+----------------+---------+
|item_bought1| id| item_bought| class|
+------------+--------------------+----------------+---------+
| Soap|uiq7Zq52Bww4pZXc3xri| Soap| laundry|
| Detergent|fpJatwxTeObcbuJH25UI| Detergent| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries|
| Soap|uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg| laundry|
| Detergent|fpJatwxTeObcbuJH25UI|Detergent x.ju2i| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries|
+------------+--------------------+----------------+---------+