将简单值从Map映射到spark DataFrame错误



我最近开始在Scala中使用Spark,我发现自己的处境是,我想将一些值从hashmap/map映射到数据帧,而不必构建新的数据帧,然后执行某种连接。

我有这个数据帧:

+---+-------+---+----------+---------+
| id|   name|age|      date|genderKey|
+---+-------+---+----------+---------+
|  1|Rodrigo| 30|2019-01-01|     male|
|  2|Roberto| 23|2019-01-01|     male|
|  3|Roberto| 25|2019-01-01|     male|
|  4|Rodrigo| 30|2019-01-01|     male|
|  5|Mariana| 32|2019-01-01|   female|
+---+-------+---+----------+---------+

这个地图结构:

var genderMap = Map[String, String](
"male" -> "Masculine",
"female" -> "Feminine"
)

我想通过一个用户定义的函数向数据帧添加一列,其中包含映射的值:

val getGenderName = udf((gender:String)=>genderMap(person))
dfPeople
.withColumn("genderName", getGenderName(col("genderKey")))
.show()

当我执行表演操作时,我得到了这个错误:

sqlfunc: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(IntegerType)))
getGender: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
java.lang.InternalError: Malformed class name
at java.lang.Class.getSimpleBinaryName(Class.java:1450)
at java.lang.Class.getSimpleName(Class.java:1309)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage$lzycompute(ScalaUDF.scala:1048)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage(ScalaUDF.scala:1047)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.doGenCode(ScalaUDF.scala:1000)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:142)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:60)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
at org.apache.spark.sql.execution.BaseLimitExec$class.doProduce(limit.scala:70)
at org.apache.spark.sql.execution.LocalLimitExec.doProduce(limit.scala:97)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.LocalLimitExec.produce(limit.scala:97)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3273)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
... 118 elided
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: -10
at java.lang.String.substring(String.java:1931)
at java.lang.Class.getSimpleBinaryName(Class.java:1448)
... 185 more

有什么想法吗?

谢谢!

来自Spark2.2+

您不必在此处使用udf,而是使用内置的typedLit函数来创建查找。

Example:

import org.apache.spark.sql.functions._
val df=Seq(("1","Rodrigo","30","2019-01-01","male"),("1","Rodrigo","30","2019-01-01","female"),("1","Rodrigo","30","2019-01-01","mal")).toDF("id","name","age","date","genderKey")
val genderMap=typedLit(Map("male" -> "Masculine","female" -> "Feminie"))
//genderMap: org.apache.spark.sql.Column = keys: [male,female], values: [Masculine,Feminie]
df.withColumn("genderName",coalesce(genderMap(col("genderKey")),lit("not found"))).show()
//+---+-------+---+----------+---------+----------+
//| id|   name|age|      date|genderKey|genderName|
//+---+-------+---+----------+---------+----------+
//|  1|Rodrigo| 30|2019-01-01|     male| Masculine|
//|  1|Rodrigo| 30|2019-01-01|   female|   Feminie|
//|  1|Rodrigo| 30|2019-01-01|      mal| not found|
//+---+-------+---+----------+---------+----------+

使用UDF

var genderMap = Map[String, String](
"male" -> "Masculine",
"female" -> "Feminine"
)
def getGenderName(genderMap:Map[String,String]) = udf((gender:String) => genderMap.getOrElse(gender,"not found"))
df.withColumn("genderName",getGenderName(genderMap)(col("genderKey"))).show()
//+---+-------+---+----------+---------+----------+
//| id|   name|age|      date|genderKey|genderName|
//+---+-------+---+----------+---------+----------+
//|  1|Rodrigo| 30|2019-01-01|     male| Masculine|
//|  1|Rodrigo| 30|2019-01-01|   female|  Feminine|
//|  1|Rodrigo| 30|2019-01-01|      mal| not found|
//+---+-------+---+----------+---------+----------+

最新更新