将[String,java.lang.Object]映射到DataFrame架构问题



我必须根据Map[String, Object]的值(而不是键(来确定模式。

示例地图:

val myMap = Map("k1" -> 1, "k2" -> "", "k3"->  new Timestamp(new Date().getTime), "k4" -> 2.0 )

目前,我已经从下面的密钥创建了一个模式:

// I have created a schema using keys
val schema = StructType(myMap.keys.toSeq.map {
StructField(_, StringType) // StringType is wrong since Object in the Map can be of any datatype
}
// I have created a RDD like below
val rdd = sc.parallelize(Seq(Row.fromSeq(myMap.values.toSeq)))
val df = sc.createDataFrame(rdd,schema)

但现在我的问题是,对象可以是双精度、日期、时间戳或任何东西。但是我已经使用如上所述的StringType创建了一个模式,这是错误的。

有没有从作为对象的Map值创建模式的想法?

参考文献:这是来自火花代码的ScalaReflection的dataTypeFor的一个想法

你可以创建这样的结构

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
/**
*createStruct based on datatype
* @param myObject Object
* @return [[DataType]]
*/
def createStruct(myObject: Object): DataType = {
myObject match {
case t if t.isInstanceOf[String] => StringType
case t if t.isInstanceOf[Long] => LongType
case t if t.isInstanceOf[Integer] => IntegerType
case t if t.isInstanceOf[Float] => FloatType
case t if t.isInstanceOf[Double] => DoubleType
case t if t.isInstanceOf[java.sql.Timestamp] => TimestampType
}
}

下面是调用上面函数的示例片段。。

val a: Seq[(Object, Object)] = myMap.keys.toList.zip(columnsMap.values.toList)
logger.info("" + a.toString)
val list = ListBuffer.empty[StructField]
a.foreach { x => {
list += StructField(x._1.toString, createStruct(x._2), false)
//println(createStruct(x._2) + "--" + x.toString())
}
//   )
}
println("list is " + list)
val schema = StructType(list.toList)
println("-----" + schema.treeString)
val df = sparkSession.sqlContext.createDataFrame(rdd, schema)
df.printSchema()
df.show

最新更新