修剪 PySpark 数据帧中的字符串列



从CSV文件创建Spark数据帧后,我想修剪一列。我试过:

df = df.withColumn("Product", df.Product.strip())

df是我的数据框,Product是表中的一列。

但是我得到错误:

列对象不可调用

strip函数的 PySpark 版本称为 trim

修剪指定字符串列两端的空格。

确保先导入函数,并将要修剪的列放在函数中。

以下方法应该有效:

from pyspark.sql.functions import trim
df = df.withColumn("Product", trim(df.Product))

版本 1.5 开始,Spark SQL 提供了两个用于修剪空格的特定函数,ltrimrtrim(在 DataFrame 文档中搜索"trim"(;您需要先导入pyspark.sql.functions。下面是一个示例:

 from pyspark.sql import SQLContext
 from pyspark.sql.functions import *
 sqlContext = SQLContext(sc)
 df = sqlContext.createDataFrame([(' 2015-04-08 ',' 2015-05-10 ')], ['d1', 'd2']) # create a dataframe - notice the extra whitespaces in the date strings
 df.collect()
 # [Row(d1=u' 2015-04-08 ', d2=u' 2015-05-10 ')]
 df = df.withColumn('d1', ltrim(df.d1)) # trim left whitespace from column d1
 df.collect()
 # [Row(d1=u'2015-04-08 ', d2=u' 2015-05-10 ')]
 df = df.withColumn('d1', rtrim(df.d1))  # trim right whitespace from d1
 df.collect()
 # [Row(d1=u'2015-04-08', d2=u' 2015-05-10 ')]

如果需要对数据帧中的所有列执行此操作。

from pyspark.sql import functions as f
for colname in df.columns:
    df = df.withColumn(colname, f.trim(f.col(colname)))

我像这样用 udf 做到了这一点:

from pyspark.sql.functions import udf
def trim(string):
    return string.strip()
trim=udf(trim)
df = sqlContext.createDataFrame([(' 2015-04-08 ',' 2015-05-10 ')], ['d1', 'd2'])
df2 = df.select(trim(df['d1']).alias('d1'),trim(df['d2']).alias('d2'))

输出如下所示:

df.show()
df2.show()
+------------+------------+
|          d1|          d2|
+------------+------------+
| 2015-04-08 | 2015-05-10 |
+------------+------------+
+----------+----------+
|        d1|        d2|
+----------+----------+
|2015-04-08|2015-05-10|
+----------+----------+

总结

修剪空间

  • 两侧:
    火花 3.0.0+ F.trim("col_name")
    火花 2.0.1+ F.trim(F.col("col_name"))
  • 左侧:
    火花 3.0.0+ F.ltrim("col_name")
    火花 2.0.1+ F.ltrim(F.col("col_name"))
  • 右侧:
    火花 3.0.0+ F.rtrim("col_name")
    星火 2.0.1+ F.rtrim(F.col("col_name"))

修剪指定的符号(例如空格和制表符(:

  • 两侧:
    火花 3.2.0+ F.expr("BTRIM(col_name, ' t')")
    火花 2.3.0+ F.expr("TRIM(BOTH ' t' FROM col_name)")
    F.regexp_replace("col_name", r"^[ t]+|[ t]+$", "")
  • 左侧:
    火花 2.3.0+ F.expr("TRIM(LEADING ' t' FROM col_name)")
    F.regexp_replace("col_name", r"^[ t]+", "")
  • 右侧:
    火花 2.3.0+ F.expr("TRIM(TRAILING ' t' FROM col_name)")
    F.regexp_replace("col_name", r"[ t]+$", "")

修剪空白区域

  • 两侧:
    F.regexp_replace("col_name", r"^s+|s+$", "") 火花 1.5.0-2.0.0 F.trim(F.col("col_name"))
  • 左侧:
    F.regexp_replace("col_name", r"^s+", "") 火花 1.5.0-2.0.0 F.ltrim(F.col("col_name"))
  • 右侧:
    F.regexp_replace("col_name", r"s+$", "") 火花 1.5.0-2.0.0 F.rtrim(F.col("col_name"))
<小时 />

例子

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()

修剪两侧的空间:

df = spark.range(1).withColumn("id", F.lit("trim_spaces_on_both_sides")) 
                   .withColumn("input", F.lit("  EXAMPLE_STRING_VALUE ")) 
                   .withColumn("output", F.trim("input"))
df.show(1, 0)
#+-------------------------+-----------------------+--------------------+
#|id                       |input                  |output              |
#+-------------------------+-----------------------+--------------------+
#|trim_spaces_on_both_sides|  EXAMPLE_STRING_VALUE |EXAMPLE_STRING_VALUE|
#+-------------------------+-----------------------+--------------------+

修剪左侧空格:

df = spark.range(1).withColumn("id", F.lit("trim_spaces_on_left")) 
                   .withColumn("input", F.lit("  EXAMPLE_STRING_VALUE ")) 
                   .withColumn("output", F.ltrim("input"))
df.show(1, 0)
#+-------------------+-----------------------+---------------------+
#|id                 |input                  |output               |
#+-------------------+-----------------------+---------------------+
#|trim_spaces_on_left|  EXAMPLE_STRING_VALUE |EXAMPLE_STRING_VALUE |
#+-------------------+-----------------------+---------------------+

修剪右侧的空间:

df = spark.range(1).withColumn("id", F.lit("trim_spaces_on_right")) 
                   .withColumn("input", F.lit("  EXAMPLE_STRING_VALUE ")) 
                   .withColumn("output", F.rtrim("input"))
df.show(1, 0)
#+--------------------+-----------------------+----------------------+
#|id                  |input                  |output                |
#+--------------------+-----------------------+----------------------+
#|trim_spaces_on_right|  EXAMPLE_STRING_VALUE |  EXAMPLE_STRING_VALUE|
#+--------------------+-----------------------+----------------------+

修剪两侧的制表符和空格:

df = spark.range(1).withColumn("id", F.lit("trim_tabs_and_spaces_on_both_sides")) 
                   .withColumn("input", F.lit("tt EXAMPLE_STRING_VALUE  t")) 
                   .withColumn("output", F.expr("TRIM(BOTH ' t' FROM input)")) 
                   .withColumn("output_spark_3_2", F.expr("BTRIM(input, ' t')"))
df.show(1, 0)
#+----------------------------------+-----------------------------+--------------------+--------------------+
#|id                                |input                        |output              |output_spark_3_2    |
#+----------------------------------+-----------------------------+--------------------+--------------------+
#|trim_tabs_and_spaces_on_both_sides|tt EXAMPLE_STRING_VALUE  t|EXAMPLE_STRING_VALUE|EXAMPLE_STRING_VALUE|
#+----------------------------------+-----------------------------+--------------------+--------------------+

修剪左侧的制表符和空格:

df = spark.range(1).withColumn("id", F.lit("trim_tabs_and_spaces_on_left")) 
                   .withColumn("input", F.lit("tt EXAMPLE_STRING_VALUE  t")) 
                   .withColumn("output", F.expr("TRIM(LEADING ' t' FROM input)"))
df.show(1, 0)
#+----------------------------+-----------------------------+------------------------+
#|id                          |input                        |output                  |
#+----------------------------+-----------------------------+------------------------+
#|trim_tabs_and_spaces_on_left|tt EXAMPLE_STRING_VALUE  t|EXAMPLE_STRING_VALUE  t|
#+----------------------------+-----------------------------+------------------------+

修剪右侧的制表符和空格:

df = spark.range(1).withColumn("id", F.lit("trim_tabs_and_spaces_on_right")) 
                   .withColumn("input", F.lit("tt EXAMPLE_STRING_VALUE  t")) 
                   .withColumn("output", F.expr("TRIM(TRAILING ' t' FROM input)"))
df.show(1, 0)
#+-----------------------------+-----------------------------+-------------------------+
#|id                           |input                        |output                   |
#+-----------------------------+-----------------------------+-------------------------+
#|trim_tabs_and_spaces_on_right|tt EXAMPLE_STRING_VALUE  t|tt EXAMPLE_STRING_VALUE|
#+-----------------------------+-----------------------------+-------------------------+

修剪两侧的空白区域:

df = spark.range(1).withColumn("id", F.lit("trim_white_space_on_both_sides")) 
                   .withColumn("input", F.lit("tt EXAMPLE_STRING_VALUE  t")) 
                   .withColumn("output", F.regexp_replace("input", r"^s+|s+$", ""))
df.show(1, 0)
#+------------------------------+-----------------------------+--------------------+
#|id                            |input                        |output              |
#+------------------------------+-----------------------------+--------------------+
#|trim_white_space_on_both_sides|tt EXAMPLE_STRING_VALUE  t|EXAMPLE_STRING_VALUE|
#+------------------------------+-----------------------------+--------------------+

修剪左侧的空白区域:

df = spark.range(1).withColumn("id", F.lit("trim_white_space_on_left")) 
                   .withColumn("input", F.lit("tt EXAMPLE_STRING_VALUE  t")) 
                   .withColumn("output", F.regexp_replace("input", r"^s+", ""))
df.show(1, 0)
#+------------------------+-----------------------------+------------------------+
#|id                      |input                        |output                  |
#+------------------------+-----------------------------+------------------------+
#|trim_white_space_on_left|tt EXAMPLE_STRING_VALUE  t|EXAMPLE_STRING_VALUE  t|
#+------------------------+-----------------------------+------------------------+

修剪右侧的空白区域:

df = spark.range(1).withColumn("id", F.lit("trim_white_space_on_right")) 
                   .withColumn("input", F.lit("tt EXAMPLE_STRING_VALUE  t")) 
                   .withColumn("output", F.regexp_replace("input", r"s+$", ""))
df.show(1, 0)
#+-------------------------+-----------------------------+-------------------------+
#|id                       |input                        |output                   |
#+-------------------------+-----------------------------+-------------------------+
#|trim_white_space_on_right|tt EXAMPLE_STRING_VALUE  t|tt EXAMPLE_STRING_VALUE|
#+-------------------------+-----------------------------+-------------------------+

如果你的数据帧有不同数据类型的列,而你只需要对字符串列执行修剪操作,那么你可以动态地执行以下操作:

#Getting all the string columns from the data frame
string_cols = [c for c, t in df.dtypes if t =='string']
for colname in string_cols :
    df= df.withColumn(colname, f.trim(f.col(colname)))

的一个好处是它不会将数据帧中所有其他列的数据类型转换为字符串,而是保留其他列的现有数据类型。

如果需要对所有列执行此操作

    df = df
        .select(
            [F.trim(F.col(c)).alias(c) for c in df.columns]
        )

这是一个文档齐全的函数,它仅适用于字符串类型列(更安全(:

from pyspark.sql import functions as F
def trimColumns(df, columns=None):
    """
    Remove left and right spaces in string column values (only takes effect on string type columns).
    Non-string columns are not affected.
    - Parameters:
      df: The input dataframe.
      columns: The columns on which to remove the side spaces.
               If None, will take all columns from the dataframe but will only work on string type columns.
    - Return:
      The cleaned dataframe.
    """
    if (columns is None):
        columns = df.columns
    columns = [f.name for f in df.schema.fields if f.name in columns and f.jsonValue().get("type")=="string"]
    for column in columns:
        df = df.withColumn(column, F.trim(column))
    return df
# Usage example 1
myDf = trimColumns(myDf)
# Usage example 2
myDf = trimColumns(myDf, myStringColumns)
source_str_col = [i[0] for i in df.dtypes if i[1] == 'string']
new_column_name_list = list(map(lambda x: "trim1('" + x+ "').alias('" +x+"'),"  , source_str_col))
with_col_sour="".join([item for item in new_column_name_list])
def trim(string):
  try:
    return string.strip()
  except:
    return string
trim1 = udf(trim)
df=eval("df.select("+with_col_sour[:-1]+")")
print(df.show())
source_str_col = [i[0] for i in df.dtypes if i[1] == 'string']
when_source = list(map(lambda x: ".withColumn('" + x+ "', trim(col('" +x+"')))"  , source_str_col))
when_source ="".join([item for item in when_source])
df=eval("df"+ when_source)

相关内容

  • 没有找到相关文章

最新更新