Pyspark pyspark.sql.函数表现得很奇怪



pyspark.sql.functions 下的 "last" 函数在 Spark 上返回不一致的结果,当我们有超过 3 个节点来分发数据时。

以下是可以轻松重新生成问题的代码。

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
numeric = sqlContext.createDataFrame([('purple', '5.0', '20'), ('blue', '2.6', '19'),  
('purple', '3.8', '15'),('purple', '3', '12'),  ('purple', '2', '4.0'), ('blue', '2', '4.0'),
('purple', '10', '11'),  ('purple', '2.5', '4.8'), 
('blue', '2.3', '4.9')],('color', 'v1', 'v2')) 
numeric.printSchema()
numeric = numeric.withColumn("v1t", numeric["v1"].cast(DoubleType()))
numeric.printSchema()
sort_numeric = numeric.sort('v1t', ascending=True)
last_by_color = sort_numeric.groupBy("color").agg(F.last("v1").alias("last_v1"), F.last("v2").alias("last_v2"))

每次"last_by_color.show()"都会给你不同的结果。这是我在 --master yarn-client 上测试时的终端输出(本地总是没问题)

请参阅附图。

Terminal_output

pyspark.sql.functions.firstpyspark.sql.functions.last函数是不确定的,因为它的结果取决于行的顺序,这些顺序在洗牌后可能是不确定的。

Pyspark 文档 pyspark.sql.functions.last

建议使用第一个和最后一个的方法是使用窗口finction。

from pyspark.sql.window import Window
w = Window().partitionBy("<your_key_column>").orderBy('<your_order_by_column>')

在您的情况下,按列定义窗口分区时将是"颜色"。按列排序通常是日期/时间戳,我们可以选择以发送和降序两种方式对数据进行排序。您没有用于对数据进行排序的日期列。

from pyspark.sql.functions import last
numeric.withColumn("v1_last", last('v1').over(w))
       .withColumn("v2_last", last('v2').over(w))
.show()

上面将给出numeric带有附加"last_v1"和"last_v2"列的数据框。因此,您需要将其减少为每个组/分区一条记录。

相关内容

  • 没有找到相关文章

最新更新