写入数据帧时出错:java.lang.RuntimeException:scala。Tuple2 不是结构<零售商:字符串,邮政编码:int 架构的有效外部类型>



我有一个数据集,在写出 json 之前,我正在提取并应用特定的模式。

我的测试数据集如下所示:

cityID|retailer|postcode
123|a1|1
123|s1|2
123|d1|3
124|a1|4
124|s1|5
124|d1|6

我想按城市 ID 分组。然后,我将应用以下架构并将其放入数据帧中。然后我想将数据写成 json。我的代码如下:

按城市 ID 分组

val rdd1 = cridf.rdd.map(x=>(x(0).toString, (x(1).toString, x(2).toString))).groupByKey() 

将 RDD 映射到行

val final1 = rdd1.map(x=>Row(x._1,x._2.toList))

应用架构

val schema2 = new StructType()
.add("cityID", StringType)
.add("reads", ArrayType(new StructType()
.add("retailer", StringType)
.add("postcode", IntegerType)))

创建数据框

val parsedDF2 = spark.createDataFrame(final1, schema2)

写入 json 文件

parsedDF2.write.mode("overwrite")
.format("json")
.option("header", "false")
.save("/XXXX/json/testdata")

由于以下错误,作业中止:

java.lang.RuntimeException: 编码时出错:

java.lang.RuntimeException: scala.Tuple2 不是结构架构的有效外部类型

可以直接从数据帧转换它。给你:

val rawData = spark.read.option("header", "true").option("delimiter", "|").csv("57407427.csv")
import org.apache.spark.sql.functions._
val readsDf = rawData.withColumn("reads",struct("retailer", "postcode")).drop("retailer", "postcode" )
val finalJsonDf = readsDf.groupBy("cityID").agg(collect_list("reads").alias("reads"))
finalJsonDf.printSchema() //for testing the schema
finalJsonDf.coalesce(1).write.mode("overwrite")
.format("json")
.option("header", "false")
.save("57407427_Op.json")

希望您也在尝试写出相同的 json 输出:

{"cityID":"124","reads":[{"retailer":"a1","postcode":"4"},{"retailer":"s1","postcode":"5"},{"retailer":"d1","postcode":"6"}]}
{"cityID":"123","reads":[{"retailer":"a1","postcode":"1"},{"retailer":"s1","postcode":"2"},{"retailer":"d1","postcode":"3"}]}

如果你不能避免使用RDD,你可以使用案例类:

case class Read(retailer: String, postcode: Int)
case class Record(cityId: String, reads: List[Read])
...
val rdd1 = cridf.rdd
.map(x => (x.head, Read(x(1), x(2).toInt)))
.groupByKey
val final1 = rdd1
.map(x => Record(x._1, x._2.toList))
.toDF
final1
.write
.mode("overwrite")
.format("json")
.option("header", "false")
.save("/XXXX/json/testdata")

final1 具有以下架构:

root
|-- cityId: string (nullable = true)
|-- reads: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- retailer: string (nullable = true)
|    |    |-- postcode: integer (nullable = false)

但是,我认为@partha_devArch解决方案要好得多。

更新

只需对代码进行最少的添加并使用提供的架构,解决方案将如下所示:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
...
val rdd1 = cridf.rdd
.map(x => (x.head, Row(x(1), x(2).toInt)))
.groupByKey
val final1 = rdd1
.map(x => Row(x._1, x._2.toList))(RowEncoder.apply(schema2).clsTag)
val parsedDF2 = spark.createDataFrame(final1, schema2)
parsedDF2
.write
.mode("overwrite")
.format("json")
.option("header", "false")
.save("/XXXX/json/testdata")

最新更新