我的数据框架具有类似于:
的结构root
|-- NPAData: struct (nullable = true)
| |-- NPADetails: struct (nullable = true)
| | |-- location: string (nullable = true)
| | |-- manager: string (nullable = true)
| |-- service: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- serviceName: string (nullable = true)
| | | |-- serviceCode: string (nullable = true)
|-- NPAHeader: struct (nullable = true)
| | |-- npaNumber: string (nullable = true)
| | |-- date: string (nullable = true)
我正在尝试的是:
- 将已将相同
npaNumber
的记录分组到列表中 - 在每个列表中,根据其
date
订购元素 - 一旦我将元素分组和订购,我就需要合并应用一些逻辑的元素。要执行此列表步骤,我决定使用地图。
这是我到目前为止尝试的:
val toUpdate = sourceDF.withColumn("count", count($"NPAHeader").over(Window.partitionBy("NPAHeader.npaNumber").orderBy($"NPAHeader.date".desc))).filter($"count" > 1)
val groupedNpa = toUpdate.groupBy($"NPAHeader.npaNumber" ).agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa"))
//This is a simply version of my logic.
def pickOne(List: Seq[Row]): Row = {
println("First element: "+List.get(0))
List.get(0)
}
val mergedNpa = groupedNpa.map(row => (row.getAs[String]("npaNumber"),pickOne(row.getAs[Seq[Row]]("npa"))))
组之后的一排示例是:
[1234,包装([Npanew,npaolder,... npaoldest]]]
,但是当我尝试从地图中调用功能时,我会得到一个例外。
线程" main" java.lang.unsupportedoperationException中的例外:否 为org.apache.spark.sql.row找到编码器 - 字段(class:" org.apache.spark.sql.row",name:" _2") - 根类:" scala.tuple2"
我了解的是,我无法从地图中调用函数pickOne()
(或至少不能以我尝试的方式)。但是我不知道我在做什么错。
为什么我有例外?
感谢您的时间!
Note :我知道有更简单的方法可以从列表中挑选一个元素而不调用自定义功能。但是我需要调用是或"是",因为在下一步中,我需要放置更复杂的逻辑以合并行。
使用Mahesh Chand Kandpal建议:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
grouped.map(row => "emdNumber: "+row.getAs[String]("emdNumber"))
val mergedNpa = groupedNpa.map(row => (row.getAs[String]("npaNumber"),pickOne(row.getAs[Seq[Row]]("npa"))(RowEncoder(row.schema))))
我有以下错误:
键入不匹配;成立 : org.apache.spark.sql.catalyst.encoders.expressionencoder [org.apache.spark.sql.row] 必需:int
我应该如何应用编码器?
当您将映射与DataFrame一起使用时,您需要提供编码。
在Spark 2.x Dataset[Row].map is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
import org.apache.spark.sql.catalyst.encoders.RowEncoder
implicit val encoder = RowEncoder(schema)