pyspark-基于语言对行进行过滤



此问题与pyspark有关。我正在阅读一个很少列的TSV文件。一个特定的列是注释列。我的任务是根据语言过滤行。例如,如果评论是用俄语语言的,那么我想过滤该特定行并将其保存在单独的文件中。

现在,在阅读文件时,我正在使用以下代码,该代码正在制作数据框。

Info = sqlContext.read.format("csv"). 
option("delimiter","t"). 
option("header", "True"). 
option("inferSchema", "True"). 
load("file.tsv")
DataFrame[ID: int Comments: string]

然后,我试图使用ORD函数根据ASCII值过滤记录:

Info.filter((map(ord,Info.Comments)) < 128).collect()

但是,我遇到了一个错误:

typeerror:参数2 to map()必须支持迭代

样本输入:

Comments
{175:'Аксессуары'}
{156:'Горные'}
{45:'Кровати, диваны и кресла'}
{45:'Кровати, диваны и кресла'}

请建议一些解决方案。感谢任何帮助/建议。

更新:

@ags29

我通过编写此代码来纠正我在评论中提到的错误。

spark_ord=F.udf(lambda x: [ord(c) for c in x],t.ArrayType(IntegerType()))
Info=Info.withColumn('russ', spark_ord('Comments'))
DataFrame[ID: int, Comments: string, russ: array<int>]

现在问题是在创建数组[int]。我必须根据数组中存在的值小于128的值过滤整个行。

我正在努力实现这一目标。请建议。

@ags29感谢您的建议。

这是答案:

通过读取上述文件创建数据框后,我们必须用某些值替换空值,在这种情况下,我将其替换为na。

InfoWoNull = Info.fillna({'Comments':'NA'})

然后,使用ORD函数创建UDF以找到字符串中每个字符的ASCII值。输出将是整数的数组。

from pyspark.sql import functions as F
from pyspark.sql import types as t
from pyspark.sql.types import ArrayType, IntegerType
russ_ord=F.udf(lambda x: [ord(a) for a in x],t.ArrayType(IntegerType()))

创建过滤器函数以根据ASCII字符大于127。

过滤值。
def russian_filter(x):
for index in range(len(x)):
    if x[index] > 127:
    return True
return False
filter_udf = F.udf(russian_filter, BooleanType())

在下面的最后一步中使用它。

Info_rus = InfoWoNull.filter(filter_udf(russ_ord('SearchParams')) == 'true')
Info_rus.show()

这没有测试,但是这些行的某些内容应起作用:

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# create user defined function from ord
spark_ord=udf(lambda x: ord(x), IntegerType())
Info=Info.withColumn('ord', spark_ord('Comments'))
Info=Info.filter('ord<128')

基本上,要将ord函数与DataFrame一起使用,您需要用户定义的函数。您尝试的方法需要RDD,而不是DataFrame

相关内容

  • 没有找到相关文章

最新更新