给定有一个消息数据集,以下代码定义:
case class Message(id: Int, value: String)
var messages = Seq(
(0, """{"action":"update","timestamp":"2017-10-05T23:01:19Z"}"""),
(1, """{"action":"update","timestamp":"2017-10-05T23:01:19Z"}""")
).toDF("id", "value").as[Message]
var schema = new StructType().add("action", StringType).add("timestamp", TimestampType)
var res = messages.select(
from_json(col("value").cast("string"), schema)
)
+------------------------------------+
|jsontostructs(CAST(value AS STRING))|
+------------------------------------+
| [update,2017-10-0...|
| [update,2017-10-0...|
在普通地图功能中访问架构信息的最佳方法是什么?该函数本身返回一排丢失了所有类型Infos。为了达到一个值,必须再次指定类型,例如
res.head().getStruct(0).getValuesMap[TimestampType](Seq("timestamp"))
=> Map[String,org.apache.spark.sql.types.TimestampType] = Map(timestamp -> 2017-10-06 01:01:19.0)
或
res.head().getStruct(0).getString(0)
=> res20: String = update
是否有一些更好的方法可以访问无火花SQL聚合功能的原始JSON数据?
作为经验法则:
-
使用Collection API(
map
,flatMap
,mapPartitions
,groupByKey
等)使用强键入API-定义记录类型(案例类工作最佳),该类型反映了架构并使用Encoders
来回转换事物:case class Value(action: String, timestamp: java.sql.Timestamp) case class ParsedMessage(id: Int, value: Option[Value]) messages.select( $"id", from_json(col("value").cast("string"), schema).alias("value") ).as[ParsedMessage].map(???)
-
使用
Dataset[Row]
与高级SQL/DataFrame
API(select
,where
,agg
,groupBy
)保持一致