如何在spark中创建类型列表[行]的编码器?



我试图在Spark DataFrame上使用flatMapGroupWithState函数来自定义聚合。

flatMapGroupState的lambda中,我试图保存Row: GroupState[List[Row]]的List。上面提到的代码,是通过抱怨

得到Runtime Exception的。

没有找到org.apache.spark.sql.Row.

但对于outputEncoder,它正在工作。

问题是:我如何在spark中创建类型:List[Row]的编码器?

val outputEncoder = RowEncoder(stateSchema)
val stateEncoder: Encoder[List[Row]] = ExpressionEncoder()
df.flatMapGroupsWithState(OutputMode.Update(), timeoutConf = GroupStateTimeout.EventTimeTimeout())(func = aggregationOperations)(stateEncoder, outputEncoder)
~
~
~
private def aggregationOperations(key: String, values: Iterator[Row], state: GroupState[List[Row]]): Iterator[Row] = {
~
~
~
}

试试RowEncoder for

StructType(列表(StructField("state", ArrayType (stateSchema))))

最新更新