在 Spark 数据集中分解 JSON 数组



我正在使用Spark 2.1和Zeppelin 0.7执行以下操作。(这是受数据砖教程(https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html)的启发)

我创建了以下架构

val jsonSchema = new StructType()
.add("Records", ArrayType(new StructType()
.add("Id", IntegerType)
.add("eventDt", StringType)
.add("appId", StringType)
.add("userId", StringType)
.add("eventName", StringType)
.add("eventValues", StringType)
)
)

读取以下json"数组"文件,该文件位于我的"inputPath"目录中

{
"Records": [{
"Id": 9550,
"eventDt": "1491810477700",
"appId": "dandb01",
"userId": "985580",
"eventName": "OG: HR: SELECT",
"eventValues": "985087"
},
... other records
]}
val rawRecords = spark.read.schema(jsonSchema).json(inputPath)

然后,我想分解这些记录以进入各个事件

val events = rawRecords.select(explode($"Records").as("record"))

但是 rawRecords.show() 和 events.show() 都是空的。

知道我做错了什么吗?过去我知道我应该为此使用 JSONL,但 Databricks 教程建议最新版本的 Spark 现在应该支持 json 数组。

我做了以下操作:

  1. 我有一个文件foo.txt包含以下数据
{"Records":[{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":">

dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"}]} {"Records":[{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"}]}

  1. >我有以下代码

    导入sqlContext.implicits._ import org.apache.spark.sql.functions._

    val df = sqlContext.read.json("foo.txt") df.printSchema()
    df.select(explode($"Records").as("record")).show

  2. 我得到以下输出

根 |-- 记录:数组(可为空 = 真)| |-- 元素:结构 (包含空值 = 真) | | |-- id: long (nullable = true) |
| |-- appId: 字符串 (空 = 真) | | |-- eventDt: 字符串(可为空 = 真)| | |-- 事件名称:字符串(可为空 = 真) | | |-- 事件值: 字符串 (可为空 = 真) | |
|-- userId: 字符串 (nullable = true)

+--------------------+
|              record|
+--------------------+
|[9550,dandb01,149...|
|[9550,dandb01,149...|
|[9550,dandb01,149...|
|[9550,dandb01,149...|
|[9550,dandb01,149...|
|[9550,dandb01,149...|
|[9550,dandb01,149...|
|[9550,dandb01,149...|
+--------------------+

最新更新