应用一个普通的python函数到pyspark df


def cleanTweets(text):
text = re.sub(r'@[A-Za-z0–9]+','', text) #remove the mentions
text = re.sub(r'#','', text) # remove the #
text = re.sub(r'RT[s]+','', text) # remove the RT
text = re.sub(r'https?://S+','', text) #remove hyperlink
return text

tweets_df_cleaned = tweets_df.withColumn('Tweets',col(udf(cleanTweets(Text))))

我如何将此应用于tweets_df,其中有一个列Text要清理,在pandas中可以通过apply

来完成


您可以使用apply

方法
tweets_df['Tweets_New'] = tweets_df['Tweets'].apply(cleanTweets)

使用Pandas的UDF(用户定义函数)。请检查您的spark版本,因为此解决方案适用于spark 3。X版本。

from pyspark.sql.functions import pandas_udf, PandasUDFType
def cleanTweets(text):
text = re.sub(r'@[A-Za-z0–9]+','', text) #remove the mentions
text = re.sub(r'#','', text) # remove the #
text = re.sub(r'RT[s]+','', text) # remove the RT
text = re.sub(r'https?://S+','', text) #remove hyperlink
return text
@pandas_udf("string", PandasUDFType.SCALAR)
tweets_df_cleaned = tweets_df.withColumn("Tweets", cleanTweets("text"))

最新更新