Pyspark:regex搜索带有Column的列表中的文本



我是Spark的新手,我有一个愚蠢的"什么是最好的方法;问题基本上,我有一个我想循环浏览的地图(dict(。在每次迭代过程中,我想使用rlikeregex搜索spark数据帧中的一列,并使用withColumn将dict的键分配给一个新列

maps = {"groceries": ["hot chocolate", "milk", "sugar", "flour"],
"laundry": ["soap", "detergent", "fabric softener"]
}

数据样本显示在下方

+--------------------+-----------+
|                  id|item_bought|
+--------------------+-----------+
|uiq7Zq52Bww4pZXc3xri|       Soap|
|fpJatwxTeObcbuJH25UI|  Detergent|
|MdK1q5gBygIGFYyvbz8J|       Milk|
+--------------------+-----------+

我想得到一个看起来像这样的数据帧:

+--------------------+-----------+---------+
|                  id|item_bought|    class|
+--------------------+-----------+---------+
|uiq7Zq52Bww4pZXc3xri|       Soap|  Laundry|
|fpJatwxTeObcbuJH25UI|  Detergent|  Laundry|
|MdK1q5gBygIGFYyvbz8J|       Milk|Groceries|
+--------------------+-----------+---------+

我有超过100M的记录,我想要一种使用Spark最佳实践(分布式计算(的方法。脑海中浮现的一种方法是循环浏览映射并使用rlikestr.contains进行正则表达式搜索,如下所示:

for key, value in maps.items():
pattern = '|'.join([f'(?i){x}' for x in value]). # ignore case
df.withColumn("class", col("item_bought").rlike(pattern))

但这会为类似rlike的搜索返回truefalse。我想用key值替换true或false。

此外,考虑到我有100M(高达150M(的记录,在地图中循环是最好的方法吗?

编辑

如果df中的items_bought有特殊字符(或一些额外的文本(怎么办?

+--------------------+----------------+
|                  id|     item_bought|
+--------------------+----------------+
|uiq7Zq52Bww4pZXc3xri|   Soap -&ju10kg|
|fpJatwxTeObcbuJH25UI|Detergent x.ju2i|
|MdK1q5gBygIGFYyvbz8J|            Milk|
+--------------------+----------------+

我不想先做文本清理,只需要根据regex关键字搜索分配类

根据您的情况,我将把映射转换为数据帧。我认为生成的数据帧将相对较小。使用广播连接。这样做的目的是将小df分发到每个工作节点,从而避免混洗。

#Create df from maps
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought')).withColumn('item_bought',explode('item_bought')).withColumn('item_bought', initcap('item_bought'))
#Broadcast join    
df.join(broadcast(df_ref), how='left', on='item_bought').show()

+-----------+--------------------+---------+
|item_bought|                  id|    class|
+-----------+--------------------+---------+
|       Soap|uiq7Zq52Bww4pZXc3xri|  laundry|
|  Detergent|fpJatwxTeObcbuJH25UI|  laundry|
|       Milk|MdK1q5gBygIGFYyvbz8J|groceries|
+-----------+--------------------+---------+

按照您的编辑

df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought1')).withColumn('item_bought1',explode('item_bought1')).withColumn('item_bought1', initcap('item_bought1'))

df.withColumn('item_bought1',regexp_extract('item_bought','^[A-Za-z]+',0)).join(broadcast(df_ref), how='left', on='item_bought1').show()
+------------+--------------------+----------------+---------+
|item_bought1|                  id|     item_bought|    class|
+------------+--------------------+----------------+---------+
|        Soap|uiq7Zq52Bww4pZXc3xri|            Soap|  laundry|
|   Detergent|fpJatwxTeObcbuJH25UI|       Detergent|  laundry|
|        Milk|MdK1q5gBygIGFYyvbz8J|            Milk|groceries|
|        Soap|uiq7Zq52Bww4pZXc3xri|   Soap -&ju10kg|  laundry|
|   Detergent|fpJatwxTeObcbuJH25UI|Detergent x.ju2i|  laundry|
|        Milk|MdK1q5gBygIGFYyvbz8J|            Milk|groceries|

+------------+--------------------+----------------+---------+

最新更新