使用普通映射/减少功能时,如何访问SparkDataset API中的JSON模式信息



给定有一个消息数据集,以下代码定义:

case class Message(id: Int, value: String)
  var messages = Seq(
  (0, """{"action":"update","timestamp":"2017-10-05T23:01:19Z"}"""),
  (1, """{"action":"update","timestamp":"2017-10-05T23:01:19Z"}""")
).toDF("id", "value").as[Message]

var schema = new StructType().add("action", StringType).add("timestamp", TimestampType)
var res = messages.select(
  from_json(col("value").cast("string"), schema)
)
+------------------------------------+
|jsontostructs(CAST(value AS STRING))|
+------------------------------------+
|                [update,2017-10-0...|
|                [update,2017-10-0...|

在普通地图功能中访问架构信息的最佳方法是什么?该函数本身返回一排丢失了所有类型Infos。为了达到一个值,必须再次指定类型,例如

res.head().getStruct(0).getValuesMap[TimestampType](Seq("timestamp"))
=> Map[String,org.apache.spark.sql.types.TimestampType] = Map(timestamp -> 2017-10-06 01:01:19.0)

res.head().getStruct(0).getString(0)
 => res20: String = update

是否有一些更好的方法可以访问无火花SQL聚合功能的原始JSON数据?

作为经验法则:

  • 使用Collection API(mapflatMapmapPartitionsgroupByKey等)使用强键入API-定义记录类型(案例类工作最佳),该类型反映了架构并使用Encoders来回转换事物:

    case class Value(action: String, timestamp: java.sql.Timestamp)
    case class ParsedMessage(id: Int, value: Option[Value])
    messages.select(
      $"id", from_json(col("value").cast("string"), schema).alias("value")
    ).as[ParsedMessage].map(???)
    
  • 使用Dataset[Row]与高级SQL/DataFrame API(selectwhereagggroupBy)保持一致

相关内容

  • 没有找到相关文章

最新更新