我需要从不同数据集中删除西班牙语和其他语言字符的重音。
我已经根据本文中提供的代码做了一个功能,可以删除特殊的重音。问题是该功能很慢,因为它使用了 UDF
.我只是想知道我是否可以提高函数的性能以在更短的时间内获得结果,因为这对小数据帧有好处,但对大数据帧不利。
提前谢谢。
在这里,代码,您将能够按显示的方式运行它:
# Importing sql types
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf, col
import unicodedata
# Building a simple dataframe:
schema = StructType([StructField("city", StringType(), True),
StructField("country", StringType(), True),
StructField("population", IntegerType(), True)])
countries = ['Venezuela', 'US@A', 'Brazil', 'Spain']
cities = ['Maracaibó', 'New York', ' São Paulo ', '~Madrid']
population = [37800000,19795791,12341418,6489162]
# Dataframe:
df = sqlContext.createDataFrame(list(zip(cities, countries, population)), schema=schema)
df.show()
class Test():
def __init__(self, df):
self.df = df
def clearAccents(self, columns):
"""This function deletes accents in strings column dataFrames,
it does not eliminate main characters, but only deletes special tildes.
:param columns String or a list of column names.
"""
# Filters all string columns in dataFrame
validCols = [c for (c, t) in filter(lambda t: t[1] == 'string', self.df.dtypes)]
# If None or [] is provided with column parameter:
if (columns == "*"): columns = validCols[:]
# Receives a string as an argument
def remove_accents(inputStr):
# first, normalize strings:
nfkdStr = unicodedata.normalize('NFKD', inputStr)
# Keep chars that has no other char combined (i.e. accents chars)
withOutAccents = u"".join([c for c in nfkdStr if not unicodedata.combining(c)])
return withOutAccents
function = udf(lambda x: remove_accents(x) if x != None else x, StringType())
exprs = [function(col(c)).alias(c) if (c in columns) and (c in validCols) else c for c in self.df.columns]
self.df = self.df.select(*exprs)
foo = Test(df)
foo.clearAccents(columns="*")
foo.df.show()
一个可能的改进是构建一个自定义Transformer
,它将处理Unicode规范化和相应的Python包装器。它应该减少在JVM和Python之间传递数据的总体开销,并且不需要对Spark本身进行任何修改或访问私有API。
在 JVM 端,你需要一个类似于这个的转换器:
package net.zero323.spark.ml.feature
import java.text.Normalizer
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.sql.types.{DataType, StringType}
class UnicodeNormalizer (override val uid: String)
extends UnaryTransformer[String, String, UnicodeNormalizer] {
def this() = this(Identifiable.randomUID("unicode_normalizer"))
private val forms = Map(
"NFC" -> Normalizer.Form.NFC, "NFD" -> Normalizer.Form.NFD,
"NFKC" -> Normalizer.Form.NFKC, "NFKD" -> Normalizer.Form.NFKD
)
val form: Param[String] = new Param(this, "form", "unicode form (one of NFC, NFD, NFKC, NFKD)",
ParamValidators.inArray(forms.keys.toArray))
def setN(value: String): this.type = set(form, value)
def getForm: String = $(form)
setDefault(form -> "NFKD")
override protected def createTransformFunc: String => String = {
val normalizerForm = forms($(form))
(s: String) => Normalizer.normalize(s, normalizerForm)
}
override protected def validateInputType(inputType: DataType): Unit = {
require(inputType == StringType, s"Input type must be string type but got $inputType.")
}
override protected def outputDataType: DataType = StringType
}
相应的构建定义(调整 Spark 和 Scala 版本以匹配您的 Spark 部署):
name := "unicode-normalization"
version := "1.0"
crossScalaVersions := Seq("2.11.12", "2.12.8")
organization := "net.zero323"
val sparkVersion = "2.4.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion
)
在 Python 方面,你需要一个类似于这个的包装器。
from pyspark.ml.param.shared import *
# from pyspark.ml.util import keyword_only # in Spark < 2.0
from pyspark import keyword_only
from pyspark.ml.wrapper import JavaTransformer
class UnicodeNormalizer(JavaTransformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__(self, form="NFKD", inputCol=None, outputCol=None):
super(UnicodeNormalizer, self).__init__()
self._java_obj = self._new_java_obj(
"net.zero323.spark.ml.feature.UnicodeNormalizer", self.uid)
self.form = Param(self, "form",
"unicode form (one of NFC, NFD, NFKC, NFKD)")
# kwargs = self.__init__._input_kwargs # in Spark < 2.0
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, form="NFKD", inputCol=None, outputCol=None):
# kwargs = self.setParams._input_kwargs # in Spark < 2.0
kwargs = self._input_kwargs
return self._set(**kwargs)
def setForm(self, value):
return self._set(form=value)
def getForm(self):
return self.getOrDefault(self.form)
构建 Scala 包:
sbt +package
在启动 shell 或提交时包含它。例如,对于使用 Scala 2.11 构建的 Spark
:bin/pyspark --jars path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar
--driver-class-path path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar
你应该准备好了。剩下的只是一点正则表达式魔法:
from pyspark.sql.functions import regexp_replace
normalizer = UnicodeNormalizer(form="NFKD",
inputCol="text", outputCol="text_normalized")
df = sc.parallelize([
(1, "Maracaibó"), (2, "New York"),
(3, " São Paulo "), (4, "~Madrid")
]).toDF(["id", "text"])
(normalizer
.transform(df)
.select(regexp_replace("text_normalized", "p{M}", ""))
.show())
## +--------------------------------------+
## |regexp_replace(text_normalized,p{M},)|
## +--------------------------------------+
## | Maracaibo|
## | New York|
## | Sao Paulo |
## | ~Madrid|
## +--------------------------------------+
请注意,这遵循与内置文本转换器相同的约定,并且不是空安全的。您可以通过检查createTransformFunc
中的null
来轻松更正此问题。
另一种使用 python Unicode 数据库的方法:
import unicodedata
import sys
from pyspark.sql.functions import translate, regexp_replace
def make_trans():
matching_string = ""
replace_string = ""
for i in range(ord(" "), sys.maxunicode):
name = unicodedata.name(chr(i), "")
if "WITH" in name:
try:
base = unicodedata.lookup(name.split(" WITH")[0])
matching_string += chr(i)
replace_string += base
except KeyError:
pass
return matching_string, replace_string
def clean_text(c):
matching_string, replace_string = make_trans()
return translate(
regexp_replace(c, "p{M}", ""),
matching_string, replace_string
).alias(c)
所以现在让我们测试一下:
df = sc.parallelize([
(1, "Maracaibó"), (2, "New York"),
(3, " São Paulo "), (4, "~Madrid"),
(5, "São Paulo"), (6, "Maracaibó")
]).toDF(["id", "text"])
df.select(clean_text("text")).show()
## +---------------+
## | text|
## +---------------+
## | Maracaibo|
## | New York|
## | Sao Paulo |
## | ~Madrid|
## | Sao Paulo|
## | Maracaibo|
## +---------------+
确认@zero323
此解决方案仅适用于 Python,但仅在可能的重音数量较少(例如,一种语言如西班牙语)并且手动指定字符替换时才有用。
似乎没有内置的方法可以直接在没有 UDF 的情况下执行您要求的操作,但是您可以链接许多regexp_replace
调用来替换每个可能的重音字符。我测试了这个解决方案的性能,结果发现,只有当你有一组非常有限的口音要替换时,它才会运行得更快。如果是这种情况,它可以比UDF更快,因为它是在Python之外优化的。
from pyspark.sql.functions import col, regexp_replace
accent_replacements_spanish = [
(u'á', 'a'), (u'Á', 'A'),
(u'é', 'e'), (u'É', 'E'),
(u'í', 'i'), (u'Í', 'I'),
(u'ò', 'o'), (u'Ó', 'O'),
(u'ú|ü', 'u'), (u'Ú|Ű', 'U'),
(u'ñ', 'n'),
# see http://stackoverflow.com/a/18123985/3810493 for other characters
# this will convert other non ASCII characters to a question mark:
('[^x00-x7F]', '?')
]
def remove_accents(column):
r = col(column)
for a, b in accent_replacements_spanish:
r = regexp_replace(r, a, b)
return r.alias('remove_accents(' + column + ')')
df = sqlContext.createDataFrame([['Olà'], ['Olé'], ['Núñez']], ['str'])
df.select(remove_accents('str')).show()
我没有将性能与其他响应进行比较,这个函数也不是那么通用,但至少值得考虑,因为你不需要将 Scala 或 Java 添加到你的构建过程中。
的实现。除了重音,我还删除了特殊字符。因为我需要透视并保存一个表,而你不能保存列名有" ,;{}()\t=\/" 字符。
import re
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
from unidecode import unidecode
spark = SparkSession.builder.getOrCreate()
data = [(1, " \ / \ {____} aŠdá_ t = n () asd ____aa 2134_ 23_"), (1, "N"), (2, "false"), (2, "1"), (3, "NULL"),
(3, None)]
schema = StructType([StructField("id", IntegerType(), True), StructField("txt", StringType(), True)])
df = SparkSession.builder.getOrCreate().createDataFrame(data, schema)
df.show()
for col_name in ["txt"]:
tmp_dict = {}
for col_value in [row[0] for row in df.select(col_name).distinct().toLocalIterator()
if row[0] is not None]:
new_col_value = re.sub("[ ,;{}()\n\t=\/]", "_", col_value)
new_col_value = re.sub('_+', '_', new_col_value)
if new_col_value.startswith("_"):
new_col_value = new_col_value[1:]
if new_col_value.endswith("_"):
new_col_value = new_col_value[:-1]
new_col_value = unidecode(new_col_value)
tmp_dict[col_value] = new_col_value.lower()
df = df.na.replace(to_replace=tmp_dict, subset=[col_name])
df.show()
如果您无法访问外部图书馆(像我一样),您可以将 UnideCode 替换为
new_col_value = new_col_value.translate(str.maketrans(
"ä,ö,ü,ẞ,á,ä,č,ď,é,ě,í,ĺ,ľ,ň,ó,ô,ŕ,š,ť,ú,ů,ý,ž,Ä,Ö,Ü,ẞ,Á,Ä,Č,Ď,É,Ě,Í,Ĺ,Ľ,Ň,Ó,Ô,Ŕ,Š,Ť,Ú,Ů,Ý,Ž",
"a,o,u,s,a,a,c,d,e,e,i,l,l,n,o,o,r,s,t,u,u,y,z,A,O,U,S,A,A,C,D,E,E,I,L,L,N,O,O,R,S,T,U,U,Y,Z"))