如何使用schemanames
中提到的架构详细信息将input5
数据格式转换为数据框架?
不使用Row(r(0),r(1))
的转换应该是动态的 - 列数可以增加或减少输入和模式,因此代码应动态。
case class Entry(schemaName: String, updType: String, ts: Long, row: Map[String, String])
val input5 = List(Entry("a","b",0,Map("col1 " -> "0000555", "ref" -> "2017-08-12 12:12:12.266528")))
val schemanames= "col1,ref"
目标数据帧应仅来自输入5的地图(例如col1
和ref
)。还有许多其他列(例如col2
,col3
...)。如果地图中有更多列,则同一列将在模式名称中提及。
架构名称变量应用于创建结构,Input5.ROW(MAP)应该是数据源...由于模式名称中的列数可以在100中,因此适用于Input5.Row中的数据。
,只要它们都是字符串,这将适用于任意数量的列,并且每个 Entry
包含一个具有这些列的 ALL 的映射:
// split to column names:
val columns = schemanames.split(",")
// create the DataFrame schema with these columns (in that order)
val schema = StructType(columns.map(StructField(_, StringType)))
// convert input5 to Seq[Row], while selecting the values from "row" Map in same order of columns
val rows = input5.map(_.row)
.map(valueMap => columns.map(valueMap.apply).toSeq)
.map(Row.fromSeq)
// finally - create dataframe
val dataframe = spark.createDataFrame(sc.parallelize(rows), schema)
您可以通过schemanames
中的条目(可能是根据您的描述中的地图中选择的键)以及用于地图操作的UDF来组装数据框,如下所示:
case class Entry(schemaName: String, updType: String, ts: Long, row: Map[String, String])
val input5 = List(
Entry("a", "b", 0, Map("col1" -> "0000555", "ref" -> "2017-08-12 12:12:12.266528")),
Entry("c", "b", 1, Map("col1" -> "0000444", "col2" -> "0000444", "ref" -> "2017-08-14 14:14:14.0")),
Entry("a", "d", 0, Map("col2" -> "0000666", "ref" -> "2017-08-16 16:16:16.0")),
Entry("e", "f", 0, Map("col1" -> "0000777", "ref" -> "2017-08-17 17:17:17.0", "others" -> "x"))
)
val schemanames= "col1, ref"
// Create dataframe from input5
val df = input5.toDF
// A UDF to get column value from Map
def getColVal(c: String) = udf(
(m: Map[String, String]) =>
m.get(c).getOrElse("n/a")
)
// Add columns based on entries in schemanames
val df2 = schemanames.split(",").map(_.trim).
foldLeft( df )(
(acc, c) => acc.withColumn( c, getColVal(c)(df("row"))
))
val df3 = df2.select(cols.map(c => col(c)): _*)
df3.show(truncate=false)
+-------+--------------------------+
|col1 |ref |
+-------+--------------------------+
|0000555|2017-08-12 12:12:12.266528|
|0000444|2017-08-14 14:14:14.0 |
|n/a |2017-08-16 16:16:16.0 |
|0000777|2017-08-17 17:17:17.0 |
+-------+--------------------------+