我正在使用dataFrames,其中元素的模式类似于:
root
|-- NPAData: struct (nullable = true)
| |-- NPADetails: struct (nullable = true)
| | |-- location: string (nullable = true)
| | |-- manager: string (nullable = true)
| |-- service: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- serviceName: string (nullable = true)
| | | |-- serviceCode: string (nullable = true)
|-- NPAHeader: struct (nullable = true)
| | |-- npaNumber: string (nullable = true)
| | |-- date: string (nullable = true)
在我的数据框架中,我想对具有相同NPAHeader.code
的所有元素进行分组,因此,我使用以下行:
val groupedNpa = orderedNpa.groupBy($"NPAHeader.code" ).agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa"))
之后,我有一个带有以下模式的数据框:
StructType(StructField(npaNumber,StringType,true), StructField(npa,ArrayType(StructType(StructField(NPAData...)))))
每行的一个示例与以下内容相似:
[1234,WrappedArray([npaNew,npaOlder,...npaOldest])]
现在,我想要的是生成另一个数据框,其中只有包装中的一个元素之一,所以我想要一个类似的输出:
[1234,npaNew]
注意:包裹段中所选的元素是在整个包装中迭代后匹配副本逻辑的元素。但是为了简化这个问题,我将始终接收包装的最后一个元素(迭代遍布所有内容)。
要这样做,我想定义一个重复的UDF
import org.apache.spark.sql.functions.udf
def returnRow(elementList : Row)(index:Int): Row = {
val dif = elementList.size - index
val row :Row = dif match{
case 0 => elementList.getAs[Row](index)
case _ => returnRow(elementList)(index + 1)
}
row
}
val returnRow_udf = udf(returnRow _)
groupedNpa.map{row => (row.getAs[String]("npaNumber"),returnRow_udf(groupedNpa("npa")(0)))}
,但我在地图中遇到以下错误:
线程" main" java.lang.unsupportedoperationException中的例外: 类型INT =>单位的模式不支持
我在做什么错?
顺便说一句,我不确定我是否使用groupedNpa("npa")
正确传递了npa
列。我正在将包裹段作为一行,因为我不知道如何通过 Array[Row]
迭代( get(index)
方法不存在在数组[row]中)
tl; dr 只需使用如何选择每个组的第一行中描述的一种方法?
如果要使用复杂的逻辑,并且返回Row
可以跳过SQL API并使用groupByKey
:
val f: (String, Iterator[org.apache.spark.sql.Row]) => Row
val encoder: Encoder
df.groupByKey(_.getAs[String]("NPAHeader.code")).mapGroups(f)(encoder)
或更好:
val g: (Row, Row) => Row
df.groupByKey(_.getAs[String]("NPAHeader.code")).reduceGroups(g)
其中 encoder
是有效的 RowEncoder
(试图将数据框架行映射到更新的行时的编码错误)。
您的代码在多种方面有故障:
groupBy
不能保证值的顺序。所以:orderBy(...).groupBy(....).agg(collect_list(...))
可以具有非确定性输出。如果您真的决定走这条路线,则应跳过
orderBy
并明确对收集的数组进行排序。您不能将咖喱功能传递给
udf
。您必须首先取消它,但是它需要不同的参数(请参见下面的示例)。如果可以的话,这可能是调用它的正确方法(请注意,您省略了第二个参数):
returnRow_udf(groupedNpa("npa")(0))
更糟的是,您将其称为
map
中,其中udfs
根本不适用。udf
无法返回Row
。它必须返回外部Scala类型。-
array<struct>
的外部表示是Seq[Row]
。您不能只用Row
代替它。 SQL数组可以通过
apply
索引访问:df.select($"array"(size($"array") - 1))
,但由于非确定性而不是正确的方法。您可以应用
sort_array
,但正如一开始所指出的,有更有效的解决方案。令人惊讶的是,递归不是那么重要。您可以像这样设计功能:
def size(i: Int=0)(xs: Seq[Any]): Int = xs match { case Seq() => i case null => i case Seq(h, t @ _*) => size(i + 1)(t) } val size_ = udf(size() _)
它可以正常工作:
Seq((1, Seq("a", "b", "c"))).toDF("id", "array") .select(size_($"array"))
虽然递归是一个过度杀伤,但如果您只能在
Seq
上迭代。