PySpark - Spark DataFrame数组不同于Python列表



如果我有一个包含arrays的Spark DataFrame,我可以通过UDF在这些数组上使用Python列表方法吗?如何将Spark DataFrame array<double>转换为Python列表?

下面是一个带有几个udf的示例。我不知道为什么采取最大的工作,但采取len没有。最后,我想用原始数组列的采样值创建一个新列。这也会导致期望两个参数的错误,如果你也能帮助解决这个问题,那就加分了!

我有以下Spark DataFrame:

from pyspark.sql.functions import udf
from pyspark.sql import Row
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType
import random
df = sc.parallelize([Row(name='Joe',scores=[1.0,2.0,3.0]),
Row(name='Mary', scores=[3.0]),
Row(name='Mary', scores=[4.0,7.1])]).toDF()
>>> df.show()
+----+---------------+
|name|         scores|
+----+---------------+
| Joe|[1.0, 2.0, 3.0]|
|Mary|          [3.0]|
|Mary|     [4.0, 7.1]|
+----+---------------+
>>> df
DataFrame[name: string, scores: array<double>]
def sampleWithReplacement(listIn,samples):
    tempList = array()
    count=0
    while (count<samples):
        tempList.append(random.sample(listIn,1)[0])
        count=count+1
    return tempList
def maxArray(listIn):
    return max(listIn)
def lenArray(listIn):
    return len(listIn)
sampUDF=udf(sampleWithReplacement,ArrayType())
maxUDF=udf(maxArray,IntegerType())
lenUDF=udf(lenArray,IntegerType())
>>> df.withColumn("maxCol",maxUDF(df.scores)).show()
+----+---------------+------+
|name|         scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]|  null|
|Mary|          [3.0]|  null|
|Mary|     [4.0, 7.1]|  null|
+----+---------------+------+
>>> df.withColumn("maxCol",lenUDF(df.scores)).show()
+----+---------------+------+
|name|         scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]|     3|
|Mary|          [3.0]|     1|
|Mary|     [4.0, 7.1]|     2|
+----+---------------+------+

TL;DR当您有选择时,总是更喜欢内置函数而不是udf。使用size(别名length))方法计算长度:

from pyspark.sql.functions import length, size 
df.withColumn("len", size("scores"))

对于小数组,可以尝试

from pyspark.sql.functions import sort_array
df.withColumn("max", sort_array("scores", False)[0])

但是对于大型集合来说,这当然不是一个好的选择。

Spark DataFrame数组与Python列表不同吗?

在内部它们是不同的,因为有Scala对象。当在udf中访问时,有一个普通的Python列表。那么到底出了什么问题呢?

让我们看一下类型。scores列为array<double>。当转换为Python类型时,结果是List[float]。当您调用max时,您将在输出上获得float

但是你声明返回类型为IntegerType。由于float不能转换为整数,因此精度损失结果是未定义的,您得到NULL。返回类型的正确选择是DoubleTypeFloatType:

maxf = udf(lambda xs: max(xs), FloatType())
maxd = udf(lambda xs: max(xs), DoubleType())
(sc
    .parallelize([("Joe", [1.0, 2.0, 3.0])])
    .toDF(["name", "scores"])
    .select("*", maxf("scores"), maxd("scores")))
与结果:

+----+---------------+----------------+----------------+
|name|         scores|<lambda>(scores)|<lambda>(scores)|
+----+---------------+----------------+----------------+
| Joe|[1.0, 2.0, 3.0]|             3.0|             3.0|
+----+---------------+----------------+----------------+

和模式:

root
 |-- name: string (nullable = true)
 |-- scores: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- <lambda>(scores): float (nullable = true)
 |-- <lambda>(scores): double (nullable = true)

相关内容

  • 没有找到相关文章

最新更新