Pyspark使用sql.transform使包含结构数组的列中的所有空字符串无效



我在pyspark df中有一列,其中包含一组映射,如下所示:

[{"address": "Fadden", "city": "", "country": "", "note": "", "stateProvince": "Queensland"}]

df.printSchema()为该列返回以下内容:

|-- constituencies: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- address: string (nullable = true)
|    |    |-- city: string (nullable = true)
|    |    |-- country: string (nullable = true)
|    |    |-- note: string (nullable = true)
|    |    |-- stateProvince: string (nullable = true)

我想取消所有那些空字符串。所以我认为这将是用F.transform(col, f)解决的完美问题

所以我创建了这个函数,然后在下面的转换表达式中使用它:

def nullify_vals(d):
def nullify_string(str_):
if str_.strip() == "":
return None
return str_.strip()

return (
dict((k, nullify_string(v)) for k, v in d.items())  
)

请注意,以上内容在字典上测试时有效:

dd = {"my": "map", "is": "", "not": "   ", "entierly": "  empty , right?"}
d_cln = nullify_vals(dd)  
d_cln["not"] is None # returns True

但当我在Pyspark中使用它时,它给了我一个错误:

import pyspark.sql.functions as F
result = kyclean.select(F.transform("constituencies", nullify_vals))

TypeError:"Column"对象不可调用

这是堆栈跟踪的最后几行:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File <command-899394298900126>:1, in <module>
----> 1 result = kyclean.select(F.transform("constituencies", nullify_vals))
File /databricks/spark/python/pyspark/sql/functions.py:4260, in transform(col, f)
4214 def transform(col, f):
4215     """
4216     Returns an array of elements after applying a transformation to each element in the input array.
4217 
(...)
4258     +--------------+
4259     """
-> 4260     return _invoke_higher_order_function("ArrayTransform", [col], [f])
File /databricks/spark/python/pyspark/sql/functions.py:4209, in _invoke_higher_order_function(name, cols, funs)
4206 expr = getattr(expressions, name)
4208 jcols = [_to_java_column(col).expr() for col in cols]
-> 4209 jfuns = [_create_lambda(f) for f in funs]
4211 return Column(sc._jvm.Column(expr(*jcols + jfuns)))

函数nullify_vals应该采用StructType类型的Column对象,因为数组元素是structs。但您传递的是一个普通的python对象。

试着这样重写:

from pyspark.sql import functions as F, Column
def nullify_vals(struct_col: Column, fields: List[str]) -> Column:
for f in fields:
struct_col = struct_col.withField(
f,
F.when(F.trim(struct_col[f]) == "", None).otherwise(struct_col[f])
)
return struct_col

对于内部结构中的每个字段,我们使用列withField方法来更新它,如果它等于空字符串,则我们将其设置为null。

应用于您的输入示例:

json_str = '{"constituencies":[{"address":"Fadden","city":"","country":"","note":"","stateProvince":"Queensland"}]}'
df = spark.read.json(spark.sparkContext.parallelize([json_str]))

您可以从数据帧模式中获得constituencies结构字段的列表

constituencies_fields = df.selectExpr("inline(constituencies)").columns
df1 = df.withColumn(
"constituencies",
F.transform("constituencies", lambda x: nullify_vals(x, constituencies_fields))
)
df1.show(truncate=False)
#+----------------------------------------+
#|constituencies                          |
#+----------------------------------------+
#|[{Fadden, null, null, null, Queensland}]|
#+----------------------------------------+

我仍在调查您收到的错误,当我发现问题时,我会更新帖子。与此同时,你可以做这样的事情来解决

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = ArrayType(
StructType([
StructField('address', StringType()),
StructField('city', StringType()),
StructField('country', StringType()),
StructField('note', StringType()),
StructField('stateProvince', StringType()),
]), True)
nullify_udf = udf(lambda arr: [[(v if v.strip() != "" else None) for v in area] for area in arr], schema)
result = kyclean.withColumn('constituencies', nullify_udf('constituencies'))

您得到的特定错误是,您不能将d.items()作为函数调用,并且输入函数确实需要处理传入的Column对象d

CCD_ 8的描述说;对输入数组中的每个元素应用转换后,返回一个元素数组">

但在可接受函数f的描述中,它说,";。。。并且可以使用Column的方法、pyspark.sql.functionsScala UserDefinedFunctions中定义的函数。不支持Python UserDefinedFunctions(SPARK-27052("所以它还不能接受自定义的Python UserDefinedFunctions,这正是你想要做的

相关内容

  • 没有找到相关文章