我在pyspark df中有一列,其中包含一组映射,如下所示:
[{"address": "Fadden", "city": "", "country": "", "note": "", "stateProvince": "Queensland"}]
df.printSchema()
为该列返回以下内容:
|-- constituencies: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- address: string (nullable = true)
| | |-- city: string (nullable = true)
| | |-- country: string (nullable = true)
| | |-- note: string (nullable = true)
| | |-- stateProvince: string (nullable = true)
我想取消所有那些空字符串。所以我认为这将是用F.transform(col, f)
解决的完美问题
所以我创建了这个函数,然后在下面的转换表达式中使用它:
def nullify_vals(d):
def nullify_string(str_):
if str_.strip() == "":
return None
return str_.strip()
return (
dict((k, nullify_string(v)) for k, v in d.items())
)
请注意,以上内容在字典上测试时有效:
dd = {"my": "map", "is": "", "not": " ", "entierly": " empty , right?"}
d_cln = nullify_vals(dd)
d_cln["not"] is None # returns True
但当我在Pyspark中使用它时,它给了我一个错误:
import pyspark.sql.functions as F
result = kyclean.select(F.transform("constituencies", nullify_vals))
TypeError:"Column"对象不可调用
这是堆栈跟踪的最后几行:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File <command-899394298900126>:1, in <module>
----> 1 result = kyclean.select(F.transform("constituencies", nullify_vals))
File /databricks/spark/python/pyspark/sql/functions.py:4260, in transform(col, f)
4214 def transform(col, f):
4215 """
4216 Returns an array of elements after applying a transformation to each element in the input array.
4217
(...)
4258 +--------------+
4259 """
-> 4260 return _invoke_higher_order_function("ArrayTransform", [col], [f])
File /databricks/spark/python/pyspark/sql/functions.py:4209, in _invoke_higher_order_function(name, cols, funs)
4206 expr = getattr(expressions, name)
4208 jcols = [_to_java_column(col).expr() for col in cols]
-> 4209 jfuns = [_create_lambda(f) for f in funs]
4211 return Column(sc._jvm.Column(expr(*jcols + jfuns)))
函数nullify_vals
应该采用StructType类型的Column对象,因为数组元素是structs。但您传递的是一个普通的python对象。
试着这样重写:
from pyspark.sql import functions as F, Column
def nullify_vals(struct_col: Column, fields: List[str]) -> Column:
for f in fields:
struct_col = struct_col.withField(
f,
F.when(F.trim(struct_col[f]) == "", None).otherwise(struct_col[f])
)
return struct_col
对于内部结构中的每个字段,我们使用列withField
方法来更新它,如果它等于空字符串,则我们将其设置为null。
应用于您的输入示例:
json_str = '{"constituencies":[{"address":"Fadden","city":"","country":"","note":"","stateProvince":"Queensland"}]}'
df = spark.read.json(spark.sparkContext.parallelize([json_str]))
您可以从数据帧模式中获得constituencies
结构字段的列表
constituencies_fields = df.selectExpr("inline(constituencies)").columns
df1 = df.withColumn(
"constituencies",
F.transform("constituencies", lambda x: nullify_vals(x, constituencies_fields))
)
df1.show(truncate=False)
#+----------------------------------------+
#|constituencies |
#+----------------------------------------+
#|[{Fadden, null, null, null, Queensland}]|
#+----------------------------------------+
我仍在调查您收到的错误,当我发现问题时,我会更新帖子。与此同时,你可以做这样的事情来解决
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = ArrayType(
StructType([
StructField('address', StringType()),
StructField('city', StringType()),
StructField('country', StringType()),
StructField('note', StringType()),
StructField('stateProvince', StringType()),
]), True)
nullify_udf = udf(lambda arr: [[(v if v.strip() != "" else None) for v in area] for area in arr], schema)
result = kyclean.withColumn('constituencies', nullify_udf('constituencies'))
您得到的特定错误是,您不能将d.items()
作为函数调用,并且输入函数确实需要处理传入的Column对象d
。
CCD_ 8的描述说;对输入数组中的每个元素应用转换后,返回一个元素数组">
但在可接受函数f
的描述中,它说,";。。。并且可以使用Column
的方法、pyspark.sql.functions
和Scala UserDefinedFunctions
中定义的函数。不支持Python UserDefinedFunctions
(SPARK-27052("所以它还不能接受自定义的Python UserDefinedFunctions
,这正是你想要做的