UDF 用于转换 map<bigint,struct<in1:bigint,in2:string>> 列以向内部结构添加更多字段



我有一个配置单元表,当读取到spark中时,它作为spark.table(<table_name>),具有以下结构:

scala> df.printSchema
root
|-- id: long (nullable = true)
|-- info: map (nullable = true)
|    |-- key: long
|    |-- value: struct (valueContainsNull = true)
|    |    |-- in1: long (nullable = true)
|    |    |-- in2: string (nullable = true)

我想强制转换map列以向内部结构添加更多字段,例如in3、in4在本例中:map<bigint,struct<in1:bigint,in2:string,in3:decimal(18,5),in4:string>>我试过正常的演员阵容,但不起作用。因此,我正在检查是否可以通过UDF实现这一点。我将把默认值分配给这些新值,比如0表示十进制和"0"字符串。以下是已尝试但无法使其发挥作用的内容。有人能建议我如何做到这一点吗?

val origStructType = new StructType().add("in1", LongType, nullable = true).add("in2", StringType, nullable = true)
val newStructType = origStructType.add("in1", LongType, nullable = true).add("in2", StringType, nullable = true).add("in3", DecimalType(18,5), nullable = true).add("in4", StringType, nullable = true)
val newColSchema = MapType(LongType, newStructType)      
val m = Map(101L->(101L,"val2"),102L->(102L,"val3"))
val df = Seq((100L,m)).toDF("id","info")
val typeUDFNewRet = udf((col1: Map[Long,Seq[(Long,String)]]) => {
col1.mapValues(v => Seq(v(0),v(1),null,"")) //Forced to use null here for another issue
}, newColSchema)
spark.udf.register("typeUDFNewRet",typeUDFNewRet)
df.registerTempTable("op1")
val df2 = spark.sql("select id, typeUDFNewRet(info) from op1")
scala> val df2 = spark.sql("select id, typeUDFNewRet(info) from op1")
df2: org.apache.spark.sql.DataFrame = [id: bigint, UDF(info): map<bigint,struct<in1:bigint,in2:string,in1:bigint,in2:string,in3:decimal(18,5),in4:string>>]
scala> df2.show(false)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.collection.Seq
at $anonfun$1$$anonfun$apply$1.apply(<console>:43)
at scala.collection.MapLike$MappedValues$$anonfun$iterato

我也尝试过用这个答案作为Row返回,但这会产生不同的问题。

试试这个-

val origStructType = new StructType().add("in1", LongType, nullable = true).add("in2", StringType, nullable = true)
val newStructType = origStructType.add("in3", DecimalType(18,5), nullable = true).add("in4", StringType, nullable = true)
val newColSchema = MapType(LongType, newStructType)
val m = Map(101L->(101L,"val2"),102L->(102L,"val3"))
val df = Seq((100L,m)).toDF("id","info")
df.show(false)
df.printSchema()
val typeUDFNewRet = udf((col1: Map[Long,Row]) => {
col1.mapValues(r => Row.merge(r, Row(null, ""))) //Forced to use null here for another issue
}, newColSchema)
spark.udf.register("typeUDFNewRet",typeUDFNewRet)
df.registerTempTable("op1")
val df2 = spark.sql("select id, typeUDFNewRet(info) from op1")
df2.show(false)
df2.printSchema()
/**
* +---+----------------------------------------------+
* |id |UDF(info)                                     |
* +---+----------------------------------------------+
* |100|[101 -> [101, val2,, ], 102 -> [102, val3,, ]]|
* +---+----------------------------------------------+
*
* root
* |-- id: long (nullable = false)
* |-- UDF(info): map (nullable = true)
* |    |-- key: long
* |    |-- value: struct (valueContainsNull = true)
* |    |    |-- in1: long (nullable = true)
* |    |    |-- in2: string (nullable = true)
* |    |    |-- in3: decimal(18,5) (nullable = true)
* |    |    |-- in4: string (nullable = true)
*/

最新更新