我对Spark和SQL还很陌生。我正试图在我的df中添加一列(然后将其保存到一个Delta表中(,该列为每个记录/行提供一个唯一的id,并在每次更新特定记录时将其递增。
我试着做以下事情:
SELECT etc,
CONCAT(somerows1) as id1,
ROW_NUMBER() OVER(PARTITION BY somerows1 ORDER BY (SELECT NULL)) AS versionid
FROM etc
somerows1是几个列的串联,以便形成一个唯一的记录。我对以特定形式排序的记录没有特别的兴趣,这就是我选择ORDERBY(SELECT NULL(的原因。
我得到以下错误:
Error in SQL statement: AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets; line 1 pos 0;
有人知道如何解决这个问题吗?
感谢
我已经通过在.writeStream
上使用foreachBatch接收器解决了这个问题。这允许您创建一个函数,其中流数据帧被视为静态/批处理数据帧(该函数应用于每个微批处理(。
在Scala中,代码看起来像这样:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{row_number, lit}
val saveWithWindowFunction = (sourceDf: DataFrame, batchId: Long) => {
val windowSpec = Window
.partitionBy("somerows1")
.orderBy(lit(null))
sourceDf
.withColumn("versionid", row_number().over(windowSpec))
//... save the dataframe using: sourceDf.write.save()
}
.writeStream
调用您的函数:
.writeStream
.format("delta")
.foreachBatch(saveWithWindowFunction)
.start()
您想要的是在一个滑动事件时间窗口上进行聚合。请查看此处的文档和示例。