斯帕克·斯卡拉·斯卡拉.MatchError of class scala.collection.immutable.$



我正在通过查询和合并 Hive 表的列来构建结构图列。稍后,我将这些记录分组在 id 列上,以便为这些 id 构建相关映射。稍后会将其写回配置单元表之前,将其联接到其他数据帧。

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType, StructField, StringType, MapType, ArrayType, LongType}
import scala.collection.Map
import scala.collection.JavaConversions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.rdd.RDD
val eschema = new StructType(Array(StructField("id", LongType, nullable = false), StructField("DEFINITION", MapType(StringType, StructType(List(StructField("a",LongType,true), StructField("b", StringType, true), StructField("c",StringType,true), StructField("d",StringType,true), StructField("e",StringType,true), StructField("f",StringType,true), StructField("g",StringType,true), StructField("h",StringType,true), StructField("i",StringType,true), StructField("j",StringType,true), StructField("k",StringType,true))))) ))
val etrans = sqlContext.sql("""select id, map(table.col1, named_struct("a", table.col2, "b", table.col3, "c", table.col4, "d", table.col5, "e", table.col6, "f", table.col7, "g", table.col8, "h", table.col9, "i", table.col10, "j", table.col11, "k", table.col12)) AS DEFINITION from table""")
val aggregatedRdd: RDD[Row] = etrans.rdd.groupBy(r => r.getAs[Long]("id")).map(row => Row(row._1, row._2.map(_.getAs[Map[String, List[(String, Any)]]]("DEFINITION")).toList))
val aggregatedDf = sqlContext.createDataFrame(aggregatedRdd, eschema)
aggregatedDf.registerTempTable("event")
aggregatedDf.printSchema()
aggregatedDf.show()             

我遇到以下匹配错误

ERROR Executor: Exception in task 0.0 in stage 83.0 (TID 3652)
scala.MatchError: List(Map(qwe -> [204,,abc,,positive,False,everywhere,always_record,counter,xyz,disabled]), Map(N/A -> [20,,something,,null,null,null,null,null,null,null]), Map(xyz -> [220,,something,,positive,False,everywhere,always_record,counter,xyz,enabled])) (of class scala.collection.immutable.$colon$colon)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$MapConverter.toCatalystImpl(CatalystTypeConverters.scala:201)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$MapConverter.toCatalystImpl(CatalystTypeConverters.scala:193)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)

MatchError 中的类是:

class scala.collection.immutable.$colon$colon

这是实现 ArrayType 的类型:

我认为问题是 ArrayType 没有类转换为列表[(字符串,任何)]:

_.getAs[Map[String, List[(String, Any)]]]("DEFINITION"))

你在getAs()中使用的代码不是递归的,而只是一个asInstanceOf[Map[...:

这是getMap()的定义,可能会更好:

def getMap[K, V](i: Int): scala.collection.Map[K, V] = getAs[Map[K, V]](i)

然后,你可以有一个辅助类转换,它运行 Map 的值,并从未知的 V 强制转换为 List[(字符串,任意)]。

相关内容

  • 没有找到相关文章

最新更新