我有一个PySpark数据框架。
df1 = spark.createDataFrame([
("u1", [0, 1, 2]),
("u1", [1, 2, 3]),
("u2", [2, 3, 4]),
],
['user_id', 'features'])
print(df1.printSchema())
df1.show(truncate=False)
输出——
root
|-- user_id: string (nullable = true)
|-- features: array (nullable = true)
| |-- element: long (containsNull = true)
None
+-------+---------+
|user_id|features |
+-------+---------+
|u1 |[0, 1, 2]|
|u1 |[1, 2, 3]|
|u2 |[2, 3, 4]|
+-------+---------+
我想要得到特征的L2范数,所以我写了一个UDF-
def norm_2_func(features):
return features/np.linalg.norm(features, 2)
norm_2_udf = udf(norm_2_func, ArrayType(FloatType()))
df2 = df1.withColumn('l2_features', norm_2_udf(F.col('features')))
但是它抛出一些错误。我怎样才能做到这一点呢?
期望输出为-
+-------+---------+----------------------+
|user_id|features | L2_norm|
+-------+---------+----------------------+
|u1 |[0, 1, 2]| [0.000, 0.447, 0.894]|
|u1 |[1, 2, 3]| [0.267, 0.534, 0.801]|
|u2 |[2, 3, 4]| [0.371, 0.557, 0.742]|
+-------+---------+----------------------+
Numpy数组包含Numpy dtypes,在返回之前需要将其转换为正常的Python dtypes (float/int等):
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, FloatType
def norm_2_func(features):
return [float(i) for i in features/np.linalg.norm(features, 2)]
# you can also use
# return list(map(float, features/np.linalg.norm(features, 2)))
norm_2_udf = F.udf(norm_2_func, ArrayType(FloatType()))
df2 = df1.withColumn('l2_features', norm_2_udf(F.col('features')))
df2.show(truncate=False)
+-------+---------+-----------------------------------+
|user_id|features |l2_features |
+-------+---------+-----------------------------------+
|u1 |[0, 1, 2]|[0.0, 0.4472136, 0.8944272] |
|u1 |[1, 2, 3]|[0.26726124, 0.5345225, 0.80178374]|
|u2 |[2, 3, 4]|[0.37139067, 0.557086, 0.74278134] |
+-------+---------+-----------------------------------+