Spark 窗口自定义函数 - 获取分区记录的总数



>我有一个时间序列数据集,它按 id 分区,并按时间戳排序。样本:

  ID     Timestamp   Feature
 "XSC"   1986-05-21  44.7530
 "XSC"   1986-05-22  44.7530
 "XSC"   1986-05-23  23.5678
 "TM"    1982-03-08  22.2734
 "TM"    1982-03-09  22.1941
 "TM"    1982-03-10  22.0847
 "TM"    1982-03-11  22.1741
 "TM"    1982-03-12  22.1840
 "TM"    1982-03-15  22.1344

我有一些需要计算的自定义逻辑,它应该在每个分区内按窗口完成。我知道Spark对窗口函数有丰富的支持,我正在尝试为此目的使用它。

我的逻辑需要当前窗口/分区中的元素总数作为标量。我需要它来做一些特定的计算(基本上,一个 for 循环到这个计数)。

我试图添加一个计数列,通过做一个

val window = Window.partitionBy("id").orderBy("timestamp") 
frame = frame.withColumn("my_cnt", count(column).over(window))

我需要执行以下操作:

var i = 1
var y = col("Feature")
var result = y
while (i < /* total number of records within each partition goes here */) {
    result = result + lit(1) * lag(y, i).over(window) + /* complex computation */
    i = i + 1
}
dataFrame.withColumn("Computed_Value", result)

如何获取每个分区中的记录总数作为标量值?我还添加了计数"my_cnt"值,它添加了分区的总值,但似乎无法在我的情况下使用它。

Spark

collect_list函数允许您将窗口化值聚合为列表。此列表可以传递给udf以进行一些复杂的计算

所以如果你有来源

val data = List(
  ("XSC", "1986-05-21", 44.7530),
  ("XSC", "1986-05-22", 44.7530),
  ("XSC", "1986-05-23", 23.5678),
  ("TM", "1982-03-08", 22.2734),
  ("TM", "1982-03-09", 22.1941),
  ("TM", "1982-03-10", 22.0847),
  ("TM", "1982-03-11", 22.1741),
  ("TM", "1982-03-12", 22.1840),
  ("TM", "1982-03-15", 22.1344),
).toDF("id", "timestamp", "feature")
  .withColumn("timestamp", to_date('timestamp))

还有一些复杂的函数,包装在你的记录上的UDF中(例如,表示为元组)

 val complexComputationUDF = udf((list: Seq[Row]) => {
  list
    .map(row => (row.getString(0), row.getDate(1).getTime, row.getDouble(2)))
    .sortBy(-_._2)
    .foldLeft(0.0) {
      case (acc, (id, timestamp, feature)) => acc + feature
    }
})
您可以定义将所有分区数据传递到每条记录的窗口

,或者在有序窗口的情况下,定义增量数据传递到每条记录

val windowAll = Window.partitionBy("id")
val windowRunning = Window.partitionBy("id").orderBy("timestamp")

并将它们放在一个新的数据集中,例如:

val newData = data
  // I assuming thatyou need id,timestamp & feature for the complex computattion. So I create a struct
  .withColumn("record", struct('id, 'timestamp, 'feature))
  // Collect all records in the partition as a list of tuples and pass them to the complexComupation
  .withColumn("computedValueAll",
     complexComupationUDF(collect_list('record).over(windowAll)))
  // Collect records in a time ordered windows in the partition as a list of tuples and pass them to the complexComupation
  .withColumn("computedValueRunning",
     complexComupationUDF(collect_list('record).over(windowRunning)))

这将导致类似以下内容:

+---+----------+-------+--------------------------+------------------+--------------------+
|id |timestamp |feature|record                    |computedValueAll  |computedValueRunning|
+---+----------+-------+--------------------------+------------------+--------------------+
|XSC|1986-05-21|44.753 |[XSC, 1986-05-21, 44.753] |113.07379999999999|44.753              |
|XSC|1986-05-22|44.753 |[XSC, 1986-05-22, 44.753] |113.07379999999999|89.506              |
|XSC|1986-05-23|23.5678|[XSC, 1986-05-23, 23.5678]|113.07379999999999|113.07379999999999  |
|TM |1982-03-08|22.2734|[TM, 1982-03-08, 22.2734] |133.0447          |22.2734             |
|TM |1982-03-09|22.1941|[TM, 1982-03-09, 22.1941] |133.0447          |44.4675             |
|TM |1982-03-10|22.0847|[TM, 1982-03-10, 22.0847] |133.0447          |66.5522             |
|TM |1982-03-11|22.1741|[TM, 1982-03-11, 22.1741] |133.0447          |88.7263             |
|TM |1982-03-12|22.184 |[TM, 1982-03-12, 22.184]  |133.0447          |110.91029999999999  |
|TM |1982-03-15|22.1344|[TM, 1982-03-15, 22.1344] |133.0447          |133.0447            |
+---+----------+-------+--------------------------+------------------+--------------------+

最新更新