如何比较Scala中的两种结构类型并更改Scala中列的数据类型



我正在尝试使用Scala&火花

val execQuery    = "select * from schema.tablename"
val yearDF       = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 19919927).option("upperBound", 28684058).option("numPartitions",30).load()
val yearDFSchema = yearDF.schema

yearDF的模式是:

root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: decimal(38,30) (nullable = true)
|-- release_number: decimal(38,30) (nullable = true)
|-- change_number: decimal(38,30) (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: decimal(15,0) (nullable = true)
|-- history_enabled_flag: string (nullable = true)

我们的项目给出的蜂窝上同一个表的模式:

val hiveColumns = source_system_name:String|description:String|creation_date:Timestamp|status:String|status_date:Timestamp|table_refresh_delay_min:Timestamp|release_number:Double|change_number:Double|interface_queue_enabled_flag:String|rework_enabled_flag:String|fdm_application_id:Bigint|history_enabled_flag:String

所以我选择了hiveColumns并创建了一个新的StructType,如下所示:

def convertDatatype(datatype: String): DataType = {
val convert = datatype match {
case "string"     => StringType
case "bigint"     => LongType
case "int"        => IntegerType
case "double"     => DoubleType
case "date"       => TimestampType
case "boolean"    => BooleanType
case "timestamp"  => TimestampType
}
convert
}

val schemaList = hiveColumns.split("\|")
val newSchema  = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
newSchema.printTreeString()
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: double (nullable = true)
|-- release_number: double (nullable = true)
|-- change_number: double (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: long (nullable = true)
|-- history_enabled_flag: string (nullable = true)

当我尝试在yearDF上应用我的新模式:schemaStructType时,如下所示,我得到了异常:

Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double

由于将十进制转换为双精度,因此发生异常。我不明白的是,如何将StructType:newSchema中的列table_refresh_delay_min, release_number, change_number, fdm_application_id的数据类型从DoubleType转换为yearDF的Schema中存在的相应数据类型。即

如果yearDFSchema中的列具有精度大于零的十进制数据类型,在本例中为十进制(38,30),则我需要将newSchema中的同一列的数据类型转换为DecimalType(38,30)

有人能告诉我怎样才能做到吗?

当您尝试使用开发人员的API函数在RDD[Row]上应用架构时,会发生类似的错误:

def createDataFrame(rows: List[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

在这种情况下,存储的数据类型必须与官方SQL中列出的外部(即Scala中的值类型)数据类型匹配,并且不应用类型转换或强制。

因此,作为用户,您有责任确保日期和模式是兼容的。

您提供的问题描述表明了完全不同的场景,需要CAST。让我们使用与您的示例完全相同的模式创建数据集:

val yearDF = spark.createDataFrame(
sc.parallelize(Seq[Row]()),
StructType(Seq(
StructField("source_system_name", StringType),
StructField("table_refresh_delay_min", DecimalType(38, 30)),
StructField("release_number", DecimalType(38, 30)),
StructField("change_number", DecimalType(38, 30)),
StructField("interface_queue_enabled_flag", StringType),
StructField("rework_enabled_flag", StringType),
StructField("fdm_application_id", DecimalType(15, 0)),
StructField("history_enabled_flag", StringType)
)))
yearDF.printSchema
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: decimal(38,30) (nullable = true)
|-- release_number: decimal(38,30) (nullable = true)
|-- change_number: decimal(38,30) (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: decimal(15,0) (nullable = true)
|-- history_enabled_flag: string (nullable = true)

以及所需类型,如

val dtypes = Seq(
"source_system_name" -> "string",
"table_refresh_delay_min" -> "double",
"release_number" -> "double",
"change_number" -> "double",
"interface_queue_enabled_flag" -> "string",
"rework_enabled_flag" -> "string",
"fdm_application_id" -> "long",
"history_enabled_flag" -> "string"
)

然后你就可以映射:

val mapping = dtypes.toMap
yearDF.select(yearDF.columns.map { c => col(c).cast(mapping(c)) }: _*).printSchema
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: double (nullable = true)
|-- release_number: double (nullable = true)
|-- change_number: double (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: long (nullable = true)
|-- history_enabled_flag: string (nullable = true)

当然,这是假设实际类型和所需类型是兼容的,并且允许CAST

如果由于特定JDBC驱动程序的特殊性,您仍然遇到问题,您应该考虑直接在查询中手动放置强制转换(在Apache Spark 2.0.0中,是否可以从外部数据库获取查询(而不是获取整个表)?)

val externalDtypes = Seq(
"source_system_name" -> "text",
"table_refresh_delay_min" -> "double precision",
"release_number" -> "float8",
"change_number" -> "float8",
"interface_queue_enabled_flag" -> "string",
"rework_enabled_flag" -> "string",
"fdm_application_id" -> "bigint",
"history_enabled_flag" -> "string"
)
val externalDtypes = dtypes.map { 
case (c, t) => s"CAST(`$c` AS $t)" 
} .mkString(", ")
val dbTable = s"""(select $fields from schema.tablename) as tmp"""

或通过自定义模式:

spark.read
.format("jdbc")
.option(
"customSchema",
dtypes.map { case (c, t) => s"`$c` $t" } .mkString(", "))
...
.load()

最新更新