Pyspark n00b...如何将列替换为自身的子字符串?我正在尝试从字符串的开头和结尾删除选定数量的字符。
from pyspark.sql.functions import substring
import pandas as pd
pdf = pd.DataFrame({'COLUMN_NAME':['_string_','_another string_']})
# this is what i'm looking for...
pdf['COLUMN_NAME_fix']=pdf['COLUMN_NAME'].str[1:-1]
df = sqlContext.createDataFrame(pdf)
# following not working... COLUMN_NAME_fix is blank
df.withColumn('COLUMN_NAME_fix', substring('COLUMN_NAME', 1, -1)).show()
这是非常接近但略有不同的 Spark 数据帧列与其他列的最后一个字符。然后是这个PySpark SQL
pyspark.sql.functions.substring(str, pos, len)
子字符串从 pos 开始,当 str 为字符串类型时为长度 len,或者返回从字节中的 pos 开始的字节数组切片,当 str 为二进制类型时
,长度为 len
在您的代码中,
df.withColumn('COLUMN_NAME_fix', substring('COLUMN_NAME', 1, -1))
1 is pos and -1 becomes len, length can't be -1 and so it returns null
试试这个,(使用固定语法)
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
udf1 = udf(lambda x:x[1:-1],StringType())
df.withColumn('COLUMN_NAME_fix',udf1('COLUMN_NAME')).show()
请尝试:
df.withColumn('COLUMN_NAME_fix', df['COLUMN_NAME'].substr(1, 10)).show()
其中 1 = 字符串中的起始位置和 10 = 从起始位置(含)开始要包含的字符数
接受的答案使用udf
(用户定义的函数),它通常(远)比本机 Spark 代码慢得多。Grant Shannon的回答确实使用了原生火花代码,但正如citynorman的评论中所指出的那样,对于可变字符串长度,这并不是100%清楚它是如何工作的。
使用本机火花代码(无 udf)和可变字符串长度回答
从 pyspark 中 substr 的文档,我们可以看到参数:startPos 和 length 可以是 int
或 Column
类型(两者都必须是相同的类型)。因此,我们只需要创建一个包含字符串长度的列并将其用作参数。
import pyspark.sql.functions as F
result = (
df
.withColumn('length', F.length('COLUMN_NAME'))
.withColumn('fixed_in_spark', F.col('COLUMN_NAME').substr(F.lit(2), F.col('length') - F.lit(2)))
)
# result:
+----------------+---------------+----+--------------+
| COLUMN_NAME|COLUMN_NAME_fix|size|fixed_in_spark|
+----------------+---------------+----+--------------+
| _string_| string| 8| string|
|_another string_| another string| 16|another string|
+----------------+---------------+----+--------------+
注意:
- 我们使用长度 - 2,因为我们从第二个字符开始(并且需要所有内容直到最后一个第二个字符)。
- 我们需要使用
F.lit
,因为我们不能在Column
对象中添加(或减去)数字。我们需要首先将该数字转换为Column
.
如果目标是从列名中删除"_",那么我会改用列表理解:
df.select(
[ col(c).alias(c.replace('_', '') ) for c in df.columns ]
)
SQL 替代方案
df = spark.sql("SELECT
COLUMN_NAME,
LENGTH(COLUMN_NAME) AS length,
SUBSTRING(COLUMN_NAME, 2, LENGTH(COLUMN_NAME) - 2) AS fixed_in_sql
FROM
your_table")