如何从 PySpark 中的数组中删除数字字符串



我试图从我的单词数组中删除数字单词,但我创建的函数无法正常工作。当我尝试查看数据帧中的信息时,将显示以下错误消息。

首先,我转换了我的字符串和单词标记

from pyspark.ml.feature import RegexTokenizer
regexTokenizer = RegexTokenizer(
    inputCol="description",
    outputCol="words_withnumber",
    pattern="\W"
)
data = regexTokenizer.transform(data)

我创建了仅删除数字的函数

from pyspark.sql.functions import when, udf
from pyspark.sql.types import BooleanType
def is_digit(value):
    if value:
        return value.isdigit()
    else:
        return False
is_digit_udf = udf(is_digit, BooleanType())

调用函数

data = data.withColumn(
    'words_withoutnumber', 
    when(~is_digit_udf(data['words_withnumber']), data['words_withnumber'])
)

错误:

org.apache.spark.SparkException:作业由于阶段失败而中止:阶段 5.0 中的任务 0 失败了 4 次,最近一次失败:在阶段 5.0 中丢失任务 0.3(TID 14、10.139.64.4,执行程序 0): org.apache.spark.api.python.PythonException: 回溯(最近一次调用):

示例数据帧:

+-----------+--------------------------------------------------------------+
|categoryid |description                                                   |
+-----------+--------------------------------------------------------------+
|      33004|["short", "sarja", "40567", "detalhe", "couro"]               | 
|      22033|["multipane", "6768686868686867868888", "220v", "branco"]     | 
+-----------+--------------------------------------------------------------+

预期成果:

+-----------+--------------------------------------------------------------+
|categoryid |description                                                   |
+-----------+--------------------------------------------------------------+
|      33004|["short", "sarja", "detalhe", "couro"]                        | 
|      22033|["multipane", "220v", "branco"]                               |
+-----------+--------------------------------------------------------------+

作为帮助@pault,解决方案是这样的。

from pyspark.sql.functions import when,udf
from pyspark.sql.types import BooleanType
def is_digit(value):
    if value:
        return value.isdigit()
    else:
        return False
is_digit_udf = udf(is_digit, BooleanType()

调用函数

from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.types import StructType
filter_length_udf = udf(lambda row: [x for x in row if not is_digit(x)], ArrayType(StringType()))
data = data.withColumn('words_clean', filter_length_udf(col('words_withnumber')))

如果你想避免udf()出于性能原因,如果逗号不会出现在你的"描述"列中,那么下面的scala解决方案就可以了。df.withColumn() 在 pyspark 中应该是类似的。

注意:我还添加了第三条记录,以表明当数字出现在数组的开头/结尾时,该解决方案有效。试试吧。

scala> val df = Seq((33004,Array("short","sarja", "40567","detalhe","couro")), (22033,Array("multipane","6768686868686867868888","220v","branco")), (33033,Array("0123","x220","220v","889"))).toDF("categoryid","description")
df: org.apache.spark.sql.DataFrame = [categoryid: int, description: array<string>]
scala> df.show(false)
+----------+-------------------------------------------------+
|categoryid|description                                      |
+----------+-------------------------------------------------+
|33004     |[short, sarja, 40567, detalhe, couro]            |
|22033     |[multipane, 6768686868686867868888, 220v, branco]|
|33033     |[0123, x220, 220v, 889]                          |
+----------+-------------------------------------------------+

scala> df.withColumn("newc",split(regexp_replace(regexp_replace(regexp_replace(concat_ws(",",'description),"""bd+b""",""),"""^,|,$""",""),",,",","),",")).show(false)
+----------+-------------------------------------------------+------------------------------+
|categoryid|description                                      |newc                          |
+----------+-------------------------------------------------+------------------------------+
|33004     |[short, sarja, 40567, detalhe, couro]            |[short, sarja, detalhe, couro]|
|22033     |[multipane, 6768686868686867868888, 220v, branco]|[multipane, 220v, branco]     |
|33033     |[0123, x220, 220v, 889]                          |[x220, 220v]                  |
+----------+-------------------------------------------------+------------------------------+

scala>

火花 2.4 答案

在 2.4 及更高版本中使用 spark-sql,您可以使用 filter() 高阶函数并获得结果

scala> val df = Seq((33004,Array("short","sarja", "40567","detalhe","couro")), (22033,Array("multipane","6768686868686867868888","220v","branco")), (33033,Array("0123","x220","220v","889"))).toDF("categoryid","description")
df: org.apache.spark.sql.DataFrame = [categoryid: int, description: array<string>]
scala> df.createOrReplaceTempView("tab")
scala> spark.sql(""" select categoryid, filter(description, x -> lower(x)!=upper(x)) fw from tab """).show(false)
+----------+------------------------------+
|categoryid|fw                            |
+----------+------------------------------+
|33004     |[short, sarja, detalhe, couro]|
|22033     |[multipane, 220v, branco]     |
|33033     |[x220, 220v]                  |
+----------+------------------------------+

scala>

既然你提到了pyspark,让我们包括python。

数据帧

data =[ (33004     , ['short', 'sarja', '40567', 'detalhe', 'couro']),
       (22033     , ['multipane', '6768686868686867868888', '220v', 'branco']),
       (33033    ,['123', 'x220', '220v', '889'] )  
]
df= spark.createDataFrame(data, ('categoryid','description' ))

法典

#Only 字母数字

df.withColumn("description_filtered", expr("filter(description, x -> x rlike '([a-z]+)')")).show()

结果

+----------+--------------------+--------------------+
|categoryid|         description|description_filtered|
+----------+--------------------+--------------------+
|     33004|[short, sarja, 40...|[short, sarja, de...|
|     22033|[multipane, 67686...|[multipane, 220v,...|
|     33033|[123, x220, 220v,...|        [x220, 220v]|
+----------+--------------------+--------------------+

#Only 按字母顺序排列

df.withColumn("description_filtered", expr("filter(description, x -> x rlike '([^0-9]{2})')")).show()

结果

+----------+--------------------+--------------------+
|categoryid|         description|description_filtered|
+----------+--------------------+--------------------+
|     33004|[short, sarja, 40...|[short, sarja, de...|
|     22033|[multipane, 67686...| [multipane, branco]|
|     33033|[123, x220, 220v,...|                  []|
+----------+--------------------+--------------------+

Spark 3.2+try_divide .如果尝试不成功(当提供的值不是数字或 null 时发生),则该函数返回 null。我们可以将其与filter一起使用,以从数组中删除数值。

完整示例:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('33004', ["short", "sarja",  "40567", "detalhe", "couro"]),
     ('22033', ["multipane", "6768686868686867868888", "220v", "branco"]),],
    ['categoryid', 'description']
)
df = df.withColumn(
    'description',
    F.expr("filter(description, x -> try_divide(x, 1) is null)")
)
df.show(truncate=0)
# +----------+------------------------------+
# |categoryid|description                   |
# +----------+------------------------------+
# |33004     |[short, sarja, detalhe, couro]|
# |22033     |[multipane, 220v, branco]     |
# +----------+------------------------------+

相关内容

  • 没有找到相关文章

最新更新