我试图在Spark DataFrame上使用flatMapGroupWithState
函数来自定义聚合。
在flatMapGroupState
的lambda中,我试图保存Row: GroupState[List[Row]]
的List。上面提到的代码,是通过抱怨
Runtime Exception
的。没有找到org.apache.spark.sql.Row.
但对于outputEncoder
,它正在工作。
问题是:我如何在spark中创建类型:List[Row]
的编码器?
val outputEncoder = RowEncoder(stateSchema)
val stateEncoder: Encoder[List[Row]] = ExpressionEncoder()
df.flatMapGroupsWithState(OutputMode.Update(), timeoutConf = GroupStateTimeout.EventTimeTimeout())(func = aggregationOperations)(stateEncoder, outputEncoder)
~
~
~
private def aggregationOperations(key: String, values: Iterator[Row], state: GroupState[List[Row]]): Iterator[Row] = {
~
~
~
}
试试RowEncoder for
StructType(列表(StructField("state", ArrayType (stateSchema))))