从n个元素的图中创建数据框架,该元素具有n个元素的示意图



如何使用schemanames中提到的架构详细信息将input5数据格式转换为数据框架?

不使用Row(r(0),r(1))的转换应该是动态的 - 列数可以增加或减少输入和模式,因此代码应动态。

case class Entry(schemaName: String, updType: String, ts: Long, row: Map[String, String])
val input5 = List(Entry("a","b",0,Map("col1 " -> "0000555", "ref" -> "2017-08-12 12:12:12.266528")))  
val schemanames= "col1,ref"     

目标数据帧应仅来自输入5的地图(例如col1ref)。还有许多其他列(例如col2col3 ...)。如果地图中有更多列,则同一列将在模式名称中提及。

架构名称变量应用于创建结构,Input5.ROW(MAP)应该是数据源...由于模式名称中的列数可以在100中,因此适用于Input5.Row中的数据。

,只要它们都是字符串,这将适用于任意数量的列,并且每个 Entry包含一个具有这些列的 ALL 的映射:

// split to column names:
val columns = schemanames.split(",")
// create the DataFrame schema with these columns (in that order)
val schema = StructType(columns.map(StructField(_, StringType)))
// convert input5 to Seq[Row], while selecting the values from "row" Map in same order of columns
val rows = input5.map(_.row)
  .map(valueMap => columns.map(valueMap.apply).toSeq)
  .map(Row.fromSeq)
// finally - create dataframe
val dataframe = spark.createDataFrame(sc.parallelize(rows), schema)

您可以通过schemanames中的条目(可能是根据您的描述中的地图中选择的键)以及用于地图操作的UDF来组装数据框,如下所示:

case class Entry(schemaName: String, updType: String, ts: Long, row: Map[String, String])
val input5 = List(
  Entry("a", "b", 0, Map("col1" -> "0000555", "ref" -> "2017-08-12 12:12:12.266528")),
  Entry("c", "b", 1, Map("col1" -> "0000444", "col2" -> "0000444", "ref" -> "2017-08-14 14:14:14.0")),
  Entry("a", "d", 0, Map("col2" -> "0000666", "ref" -> "2017-08-16 16:16:16.0")),
  Entry("e", "f", 0, Map("col1" -> "0000777", "ref" -> "2017-08-17 17:17:17.0", "others" -> "x"))
)  
val schemanames= "col1, ref"
// Create dataframe from input5
val df = input5.toDF
// A UDF to get column value from Map
def getColVal(c: String) = udf(
  (m: Map[String, String]) =>
     m.get(c).getOrElse("n/a")
)
// Add columns based on entries in schemanames
val df2 = schemanames.split(",").map(_.trim).
  foldLeft( df )(
    (acc, c) => acc.withColumn( c, getColVal(c)(df("row"))
  ))
val df3 = df2.select(cols.map(c => col(c)): _*)
df3.show(truncate=false)
+-------+--------------------------+
|col1   |ref                       |
+-------+--------------------------+
|0000555|2017-08-12 12:12:12.266528|
|0000444|2017-08-14 14:14:14.0     |
|n/a    |2017-08-16 16:16:16.0     |
|0000777|2017-08-17 17:17:17.0     |
+-------+--------------------------+

相关内容

  • 没有找到相关文章

最新更新