pyspark.sql.functions 下的 "last" 函数在 Spark 上返回不一致的结果,当我们有超过 3 个节点来分发数据时。
以下是可以轻松重新生成问题的代码。
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
numeric = sqlContext.createDataFrame([('purple', '5.0', '20'), ('blue', '2.6', '19'),
('purple', '3.8', '15'),('purple', '3', '12'), ('purple', '2', '4.0'), ('blue', '2', '4.0'),
('purple', '10', '11'), ('purple', '2.5', '4.8'),
('blue', '2.3', '4.9')],('color', 'v1', 'v2'))
numeric.printSchema()
numeric = numeric.withColumn("v1t", numeric["v1"].cast(DoubleType()))
numeric.printSchema()
sort_numeric = numeric.sort('v1t', ascending=True)
last_by_color = sort_numeric.groupBy("color").agg(F.last("v1").alias("last_v1"), F.last("v2").alias("last_v2"))
每次"last_by_color.show()"都会给你不同的结果。这是我在 --master yarn-client 上测试时的终端输出(本地总是没问题)
请参阅附图。
Terminal_output
pyspark.sql.functions.first
和pyspark.sql.functions.last
函数是不确定的,因为它的结果取决于行的顺序,这些顺序在洗牌后可能是不确定的。
Pyspark 文档 pyspark.sql.functions.last
建议使用第一个和最后一个的方法是使用窗口finction。
from pyspark.sql.window import Window
w = Window().partitionBy("<your_key_column>").orderBy('<your_order_by_column>')
在您的情况下,按列定义窗口分区时将是"颜色"。按列排序通常是日期/时间戳,我们可以选择以发送和降序两种方式对数据进行排序。您没有用于对数据进行排序的日期列。
from pyspark.sql.functions import last
numeric.withColumn("v1_last", last('v1').over(w))
.withColumn("v2_last", last('v2').over(w))
.show()
上面将给出numeric
带有附加"last_v1"和"last_v2"列的数据框。因此,您需要将其减少为每个组/分区一条记录。