Spark Scala中从rdd到Dataframe的模式推理



这个问题来自(Spark-用不同的数据类型以编程方式创建模式(

我正在尝试从rdd到Dataframe推断模式,下面是我的代码

def inferType(field: String) = field.split(":")(1) match {
case "Integer" => IntegerType
case "Double" => DoubleType
case "String" => StringType
case "Timestamp" => TimestampType
case "Date" => DateType
case "Long" => LongType
case _ => StringType
}

val header = c1:String|c2:String|c3:Double|c4:Integer|c5:String|c6:Timestamp|c7:Long|c8:Date
val df1 = Seq(("a|b|44.44|5|c|2018-01-01 01:00:00|456|2018-01-01")).toDF("data")
val rdd1 = df1.rdd.map(x => Row(x.getString(0).split("\|"): _*))
val schema = StructType(header.split("\|").map(column => StructField(column.split(":")(0), inferType(column), true)))
val df = spark.createDataFrame(rdd1, schema)
df.show()

当我做这个节目时,它会抛出以下错误。我必须对更大规模的数据执行此操作,并且很难找到正确的解决方案,你能帮助我找到一个解决方案吗?或者任何其他方式,我可以在哪里实现这一点。

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int

提前感谢

简短回答:不能使用自定义类型/格式指定字符串/文本。

您要做的是将字符串解析为sql列。与其他示例的不同之处在于,从csv加载,您正尝试这样做。工作版本可以这样实现:

// skipped other details such as schematype, spark session...
val header = "c1:String|c2:String|c3:Double|c4:Integer"
// Create `Row` from `Seq`
val row = Row.fromSeq(Seq("a|b|44.44|12|"))
// Create `RDD` from `Row`
val rdd: RDD[Row] = spark.sparkContext
.makeRDD(List(row))
.map { row =>
row.getString(0).split("\|") match {
case Array(col1, col2, col3, col4) =>
Row.fromTuple(col1, col2, col3.toDouble, col4.toInt)
}
}
val stt: StructType = StructType(
header
.split("\|")
.map(column => StructField(column, inferType(column), true))
)
val dataFrame = spark.createDataFrame(rdd, stt)
dataFrame.show()

从Scala类型创建Row的原因是在这里引入兼容类型或Row尊重类型
注意,我跳过了与日期和时间相关的字段,日期转换很棘手。你可以在这里查看我的另一个答案如何使用格式化的日期和时间戳

最新更新