我正试图构建一个递归重写ArrayType列的spark函数:
import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.functions._
val arrayHead = udf((sequence: Seq[String]) => sequence.head)
val arrayTail = udf((sequence: Seq[String]) => sequence.tail)
// re-produces the ArrayType column recursively
val rewriteArrayCol = (c: Column) => {
def helper(elementsRemaining: Column, outputAccum: Column): Column = {
when(size(elementsRemaining) === lit(0), outputAccum)
.otherwise(helper(arrayTail(elementsRemaining), concat(outputAccum, array(arrayHead(elementsRemaining)))))
}
helper(c, array())
}
// Test
val df =
Seq("100" -> Seq("a", "b", "b", "b", "b", "b", "c", "c", "d"))
.toDF("id", "sequence")
// .withColumn("test_tail", arrayTail($"sequence")) //head & tail udfs work
// .withColumn("test", rewriteArrayCol($"sequence")) //stackoverflow if uncommented
display(df)
不幸的是,我一直在流鼻涕。我认为函数缺少的一个方面是它不是尾部递归的;即整个"when().otherwise()"块与"if-else"块不同。话虽如此,该函数目前在应用于即使是微小的数据帧时也会抛出堆栈流(所以我认为它肯定有比不进行尾部递归更大的错误)。
我在网上找不到任何类似功能的例子,所以我想在这里问一下。我能找到的Column=>Column函数的唯一实现是非常非常简单的,对这个用例没有帮助。
注意:我可以通过使用UDF来实现上述功能。我尝试创建Column=>Column函数的原因是,与UDF相比,Spark能够更好地优化这些函数(据我所知)。
这是行不通的,因为这里没有有意义的停止条件。when
/otherwise
不是语言级别的控制流块(因此不能中断执行),函数将永远递归。
事实上,即使在任何SQL评估上下文之外的空数组中,它也不会停止:
rewriteArrayCol(array())
此外,你的假设是不正确的。跳过代码反序列化数据两次(每个arrayHead
、arrayTail
一次)这一事实,这比只调用udf
一次要糟糕得多(尽管使用slice可以避免),非常复杂的表达式也有自己的问题,其中之一就是代码生成大小限制。
不过,不要绝望——已经有一个有效的解决方案了——那就是transform
。请参见如何使用变换高阶函数?