我在Spark中有一个数据帧,其中一列包含一个数组。现在,我已经编写了一个单独的UDF,它将数组转换为另一个只包含不同值的数组。参见以下示例:
例如:[24,23,27,23]应转换为[34,23,27]代码:
def uniq_array(col_array):
x = np.unique(col_array)
return x
uniq_array_udf = udf(uniq_array,ArrayType(IntegerType()))
Df3 = Df2.withColumn("age_array_unique",uniq_array_udf(Df2.age_array))
在上面的代码中,Df2.age_array
是我在其上应用UDF以获得不同列"age_array_unique"
的数组,该列应该只包含数组中的唯一值。
然而,一旦我运行命令Df3.show()
,我就会得到错误:
net.razorvine.pickle.PickleException:构造ClassDict(对于numpy.core.multirarray._reconstruct(需要零个参数
有人能告诉我为什么会发生这种事吗?
谢谢!
问题的根源是从UDF返回的对象不符合声明的类型。np.unique
不仅返回numpy.ndarray
,而且将数字转换为与DataFrame
API不兼容的相应NumPy
类型。你可以试试这样的东西:
udf(lambda x: list(set(x)), ArrayType(IntegerType()))
或者这个(为了维持秩序(
udf(lambda xs: list(OrderedDict((x, None) for x in xs)),
ArrayType(IntegerType()))
相反。
如果你真的想要np.unique
,你必须转换输出:
udf(lambda x: np.unique(x).tolist(), ArrayType(IntegerType()))
您需要将最终值转换为python列表。您实现的功能如下:
def uniq_array(col_array):
x = np.unique(col_array)
return list(x)
这是因为Spark不理解numpy数组格式。为了提供Spark DataFrames理解为ArrayType
的python对象,您需要在返回输出之前将其转换为python list
。
当我的UDF返回一个float,但我忘记将其强制转换为float时,我也遇到了这个错误。我需要这样做:
retval = 0.5
return float(retval)
从pyspark版本2.4开始,您可以使用array_dispersity转换
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.array_distinct
下面对我来说很好
udf(lambda x: np.unique(x).tolist(), ArrayType(IntegerType()))
[x.item() for x in <any numpy array>]
将其转换为普通python。