将ROW_NUMBER列添加到流数据帧中



我对Spark和SQL还很陌生。我正试图在我的df中添加一列(然后将其保存到一个Delta表中(,该列为每个记录/行提供一个唯一的id,并在每次更新特定记录时将其递增。

我试着做以下事情:

SELECT etc,
CONCAT(somerows1) as id1,
ROW_NUMBER() OVER(PARTITION BY somerows1 ORDER BY (SELECT NULL)) AS versionid
FROM etc

somerows1是几个列的串联,以便形成一个唯一的记录。我对以特定形式排序的记录没有特别的兴趣,这就是我选择ORDERBY(SELECT NULL(的原因。

我得到以下错误:

Error in SQL statement: AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets; line 1 pos 0;

有人知道如何解决这个问题吗?

感谢

我已经通过在.writeStream上使用foreachBatch接收器解决了这个问题。这允许您创建一个函数,其中流数据帧被视为静态/批处理数据帧(该函数应用于每个微批处理(。

在Scala中,代码看起来像这样:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{row_number, lit}
val saveWithWindowFunction = (sourceDf: DataFrame, batchId: Long) => {
val windowSpec = Window
.partitionBy("somerows1") 
.orderBy(lit(null))

sourceDf
.withColumn("versionid", row_number().over(windowSpec))
//... save the dataframe using: sourceDf.write.save()
}

.writeStream调用您的函数:

.writeStream
.format("delta")
.foreachBatch(saveWithWindowFunction)
.start()

您想要的是在一个滑动事件时间窗口上进行聚合。请查看此处的文档和示例。

相关内容

  • 没有找到相关文章

最新更新