如果我有一个包含arrays
的Spark DataFrame
,我可以通过UDF在这些数组上使用Python列表方法吗?如何将Spark DataFrame
array<double>
转换为Python列表?
下面是一个带有几个udf的示例。我不知道为什么采取最大的工作,但采取len
没有。最后,我想用原始数组列的采样值创建一个新列。这也会导致期望两个参数的错误,如果你也能帮助解决这个问题,那就加分了!
我有以下Spark DataFrame
:
from pyspark.sql.functions import udf
from pyspark.sql import Row
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType
import random
df = sc.parallelize([Row(name='Joe',scores=[1.0,2.0,3.0]),
Row(name='Mary', scores=[3.0]),
Row(name='Mary', scores=[4.0,7.1])]).toDF()
>>> df.show()
+----+---------------+
|name| scores|
+----+---------------+
| Joe|[1.0, 2.0, 3.0]|
|Mary| [3.0]|
|Mary| [4.0, 7.1]|
+----+---------------+
>>> df
DataFrame[name: string, scores: array<double>]
def sampleWithReplacement(listIn,samples):
tempList = array()
count=0
while (count<samples):
tempList.append(random.sample(listIn,1)[0])
count=count+1
return tempList
def maxArray(listIn):
return max(listIn)
def lenArray(listIn):
return len(listIn)
sampUDF=udf(sampleWithReplacement,ArrayType())
maxUDF=udf(maxArray,IntegerType())
lenUDF=udf(lenArray,IntegerType())
>>> df.withColumn("maxCol",maxUDF(df.scores)).show()
+----+---------------+------+
|name| scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]| null|
|Mary| [3.0]| null|
|Mary| [4.0, 7.1]| null|
+----+---------------+------+
>>> df.withColumn("maxCol",lenUDF(df.scores)).show()
+----+---------------+------+
|name| scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]| 3|
|Mary| [3.0]| 1|
|Mary| [4.0, 7.1]| 2|
+----+---------------+------+
TL;DR当您有选择时,总是更喜欢内置函数而不是udf
。使用size
(别名length)
)方法计算长度:
from pyspark.sql.functions import length, size
df.withColumn("len", size("scores"))
对于小数组,可以尝试
from pyspark.sql.functions import sort_array
df.withColumn("max", sort_array("scores", False)[0])
但是对于大型集合来说,这当然不是一个好的选择。
Spark DataFrame数组与Python列表不同吗?
在内部它们是不同的,因为有Scala对象。当在udf
中访问时,有一个普通的Python列表。那么到底出了什么问题呢?
让我们看一下类型。scores
列为array<double>
。当转换为Python类型时,结果是List[float]
。当您调用max
时,您将在输出上获得float
。
但是你声明返回类型为IntegerType
。由于float
不能转换为整数,因此精度损失结果是未定义的,您得到NULL
。返回类型的正确选择是DoubleType
或FloatType
:
maxf = udf(lambda xs: max(xs), FloatType())
maxd = udf(lambda xs: max(xs), DoubleType())
(sc
.parallelize([("Joe", [1.0, 2.0, 3.0])])
.toDF(["name", "scores"])
.select("*", maxf("scores"), maxd("scores")))
与结果:+----+---------------+----------------+----------------+
|name| scores|<lambda>(scores)|<lambda>(scores)|
+----+---------------+----------------+----------------+
| Joe|[1.0, 2.0, 3.0]| 3.0| 3.0|
+----+---------------+----------------+----------------+
和模式:
root
|-- name: string (nullable = true)
|-- scores: array (nullable = true)
| |-- element: double (containsNull = true)
|-- <lambda>(scores): float (nullable = true)
|-- <lambda>(scores): double (nullable = true)