PySpark PythonUDF缺少输入属性



我试图使用Spark SQL数据框架读取一些数据,并将一堆文本清理函数应用到每一行。

import langid
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from pyspark.sql import HiveContext
hsC = HiveContext(sc)
df = hsC.sql("select * from sometable")
def check_lang(data_str):
    language = langid.classify(data_str)
    # only english
    record = ''
    if language[0] == 'en':
        # probability of correctly id'ing the language greater than 90%
        if language[1] > 0.9:
            record = data_str
    return record
check_lang_udf = udf(lambda x: check_lang(x), StringType())
clean_df = df.select("Field1", check_lang_udf("TextField"))

然而,当我试图运行这个,我得到以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o31.select.
: java.lang.AssertionError: assertion failed: Unable to evaluate PythonUDF.  Missing input attributes

我花了很多时间来收集更多的信息,但是我什么也找不到。

作为旁注,我知道下面的代码可以工作,但我想继续使用数据框架。

removeNonEn = data.map(lambda record: (record[0], check_lang(record[1])))

我还没有尝试过这个代码,但从API文档建议这应该工作:

hsC.registerFunction("check_lang", check_lang)
clean_df = df.selectExpr("Field1", "check_lang('TextField')")

相关内容

  • 没有找到相关文章

最新更新