如何实现Spark dataFrame中的lead



我想为下一个时间戳添加列。我参考了各种网站,并创建了类似的代码。但是,发生了一个错误。如何解决这个问题?为什么会出错?

+--------------------+
|           Timestamp|
+--------------------+
|2014-04-01 12:00:...|
|2014-04-01 12:00:...|
|2014-04-01 12:00:...|
|2014-04-01 12:00:...|
|2014-04-01 12:00:...|
+--------------------+

w = Window.partitionBy(("Timestamp")).orderBy(("Timestamp"))
df_FD.withColumn("end_date", lead("Timestamp", 1).over(w)).show(3)

错误消息

Py4JJavaError                             Traceback (most recent call last)
/var/folders/3f/cj3qrr9x2dlgwkp147wyd6280000gn/T/ipykernel_20362/1581562162.py in <module>
5 from pyspark.sql.functions import *
6 w = Window.partitionBy(("Timestamp")).orderBy(("Timestamp"))
----> 7 df_FD.withColumn("end_date", lead("Timestamp", 1).over(w)).show(3)
8 
.
.
.
Py4JJavaError: An error occurred while calling o1334.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange rangepartitioning(Timestamp#541 ASC NULLS FIRST, 200)
+- *(1) FileScan csv [Session_ID#540,Timestamp#541,Item_ID#542,Category#543] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/iwayamayuto/Desktop/yoochoose-clicks.dat], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Session_ID:string,Timestamp:timestamp,Item_ID:string,Category:string>

您会得到一个错误,因为Window.partitionBy(("Timestamp")).orderBy(("Timestamp"))没有真正的意义。实际上,您按Timestamp进行分区,然后在每个分区内按Timestamp对记录排序。在每个分区中,所有的Timestamp都是相等的,所以按它们的值排序是没有意义的,Spark也不允许这样做。

在spark中计算整个数据框上值的lead(或lag)是很棘手的。Spark是一个分布式计算引擎,可以进行分区计算。一个解决方案是不对窗口进行分区:

w = Window.orderBy(("Timestamp"))
df_FD.withColumn("end_date", lead("Timestamp", 1).over(w)).show()

然而,这不是一个好主意。Spark将抛出一个警告,因为这意味着将所有数据放在同一个分区中。如果这是可能的,那么你可能根本就不需要火花。一般来说,这会抛出一个OOM错误。

有一个更复杂的解决方案(解释在注释中)。

# let's sort the dataframe by timestamp and add a column containing
# the partition index
sorted_df = df.orderBy("Timestamp")
.select("Timestamp", F.spark_partition_id().alias("partition_index"))
.cache()
# let's collect the minimal timestamp of each partition.
# That timestamp is the lead of the last timestamp of the previous partition
last_lead = sorted_df.groupBy("partition_index")
.agg(F.min(F.col("Timestamp")).alias("first_ts"))
.withColumn("partition_index", F.col("partition_index") - 1)
# Let's now create a window partitioned by partition index and ordered
# by Timestamp
w = Window.partitionBy("partition_index").orderBy("Timestamp")
# We compute the lead within each partition of w, and join with last_lead
# to compute the lead of the last item of each partition
sorted_df.withColumn("lead", F.lead("Timestamp", 1).over(w))
.join(last_lead, "partition_index", "left")
.withColumn("lead", F.coalesce("lead", "first_ts"))
.drop("first_ts").show()

相关内容

  • 没有找到相关文章

最新更新