我正在尝试使用Scala&火花
val execQuery = "select * from schema.tablename"
val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 19919927).option("upperBound", 28684058).option("numPartitions",30).load()
val yearDFSchema = yearDF.schema
yearDF的模式是:
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: decimal(38,30) (nullable = true)
|-- release_number: decimal(38,30) (nullable = true)
|-- change_number: decimal(38,30) (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: decimal(15,0) (nullable = true)
|-- history_enabled_flag: string (nullable = true)
我们的项目给出的蜂窝上同一个表的模式:
val hiveColumns = source_system_name:String|description:String|creation_date:Timestamp|status:String|status_date:Timestamp|table_refresh_delay_min:Timestamp|release_number:Double|change_number:Double|interface_queue_enabled_flag:String|rework_enabled_flag:String|fdm_application_id:Bigint|history_enabled_flag:String
所以我选择了hiveColumns并创建了一个新的StructType,如下所示:
def convertDatatype(datatype: String): DataType = {
val convert = datatype match {
case "string" => StringType
case "bigint" => LongType
case "int" => IntegerType
case "double" => DoubleType
case "date" => TimestampType
case "boolean" => BooleanType
case "timestamp" => TimestampType
}
convert
}
val schemaList = hiveColumns.split("\|")
val newSchema = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
newSchema.printTreeString()
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: double (nullable = true)
|-- release_number: double (nullable = true)
|-- change_number: double (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: long (nullable = true)
|-- history_enabled_flag: string (nullable = true)
当我尝试在yearDF上应用我的新模式:schemaStructType时,如下所示,我得到了异常:
Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double
由于将十进制转换为双精度,因此发生异常。我不明白的是,如何将StructType:newSchema
中的列table_refresh_delay_min, release_number, change_number, fdm_application_id
的数据类型从DoubleType
转换为yearDF的Schema中存在的相应数据类型。即
如果yearDFSchema
中的列具有精度大于零的十进制数据类型,在本例中为十进制(38,30),则我需要将newSchema
中的同一列的数据类型转换为DecimalType(38,30)
有人能告诉我怎样才能做到吗?
当您尝试使用开发人员的API函数在RDD[Row]
上应用架构时,会发生类似的错误:
def createDataFrame(rows: List[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
在这种情况下,存储的数据类型必须与官方SQL中列出的外部(即Scala中的值类型)数据类型匹配,并且不应用类型转换或强制。
因此,作为用户,您有责任确保日期和模式是兼容的。
您提供的问题描述表明了完全不同的场景,需要CAST
。让我们使用与您的示例完全相同的模式创建数据集:
val yearDF = spark.createDataFrame(
sc.parallelize(Seq[Row]()),
StructType(Seq(
StructField("source_system_name", StringType),
StructField("table_refresh_delay_min", DecimalType(38, 30)),
StructField("release_number", DecimalType(38, 30)),
StructField("change_number", DecimalType(38, 30)),
StructField("interface_queue_enabled_flag", StringType),
StructField("rework_enabled_flag", StringType),
StructField("fdm_application_id", DecimalType(15, 0)),
StructField("history_enabled_flag", StringType)
)))
yearDF.printSchema
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: decimal(38,30) (nullable = true)
|-- release_number: decimal(38,30) (nullable = true)
|-- change_number: decimal(38,30) (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: decimal(15,0) (nullable = true)
|-- history_enabled_flag: string (nullable = true)
以及所需类型,如
val dtypes = Seq(
"source_system_name" -> "string",
"table_refresh_delay_min" -> "double",
"release_number" -> "double",
"change_number" -> "double",
"interface_queue_enabled_flag" -> "string",
"rework_enabled_flag" -> "string",
"fdm_application_id" -> "long",
"history_enabled_flag" -> "string"
)
然后你就可以映射:
val mapping = dtypes.toMap
yearDF.select(yearDF.columns.map { c => col(c).cast(mapping(c)) }: _*).printSchema
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: double (nullable = true)
|-- release_number: double (nullable = true)
|-- change_number: double (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: long (nullable = true)
|-- history_enabled_flag: string (nullable = true)
当然,这是假设实际类型和所需类型是兼容的,并且允许CAST
。
如果由于特定JDBC驱动程序的特殊性,您仍然遇到问题,您应该考虑直接在查询中手动放置强制转换(在Apache Spark 2.0.0中,是否可以从外部数据库获取查询(而不是获取整个表)?)
val externalDtypes = Seq(
"source_system_name" -> "text",
"table_refresh_delay_min" -> "double precision",
"release_number" -> "float8",
"change_number" -> "float8",
"interface_queue_enabled_flag" -> "string",
"rework_enabled_flag" -> "string",
"fdm_application_id" -> "bigint",
"history_enabled_flag" -> "string"
)
val externalDtypes = dtypes.map {
case (c, t) => s"CAST(`$c` AS $t)"
} .mkString(", ")
val dbTable = s"""(select $fields from schema.tablename) as tmp"""
或通过自定义模式:
spark.read
.format("jdbc")
.option(
"customSchema",
dtypes.map { case (c, t) => s"`$c` $t" } .mkString(", "))
...
.load()