使用Scala在Spark DataFrame中重新使用JSON中的Schema



我有一些JSON数据,如下所示:

{"gid":"111","createHour":"2014-10-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:40:37.0"},{"revId":"4","modDate":"2014-11-20 01:40:40.0"}],"comments":[],"replies":[]}
{"gid":"222","createHour":"2014-12-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:39:31.0"},{"revId":"4","modDate":"2014-11-20 01:39:34.0"}],"comments":[],"replies":[]}
{"gid":"333","createHour":"2015-01-21 00:00:00.0","revisions":[{"revId":"25","modDate":"2014-11-21 00:34:53.0"},{"revId":"110","modDate":"2014-11-21 00:47:10.0"}],"comments":[{"comId":"4432","content":"How are you?"}],"replies":[{"repId":"4441","content":"I am good."}]}
{"gid":"444","createHour":"2015-09-20 23:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 23:23:47.0"}],"comments":[],"replies":[]}
{"gid":"555","createHour":"2016-01-21 01:00:00.0","revisions":[{"revId":"135","modDate":"2014-11-21 01:01:58.0"}],"comments":[],"replies":[]}
{"gid":"666","createHour":"2016-04-23 19:00:00.0","revisions":[{"revId":"136","modDate":"2014-11-23 19:50:51.0"}],"comments":[],"replies":[]}

我可以在中阅读

val df = sqlContext.read.json("./data/full.json")

我可以用df.printSchema 打印模式

root
 |-- comments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comId: string (nullable = true)
 |    |    |-- content: string (nullable = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- repId: string (nullable = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

我可以显示数据df.show(10,false)

+---------------------+---------------------+---+-------------------+---------------------------------------------------------+
|comments             |createHour           |gid|replies            |revisions                                                |
+---------------------+---------------------+---+-------------------+---------------------------------------------------------+
|[]                   |2014-10-20 01:00:00.0|111|[]                 |[[2014-11-20 01:40:37.0,2], [2014-11-20 01:40:40.0,4]]   |
|[]                   |2014-12-20 01:00:00.0|222|[]                 |[[2014-11-20 01:39:31.0,2], [2014-11-20 01:39:34.0,4]]   |
|[[4432,How are you?]]|2015-01-21 00:00:00.0|333|[[I am good.,4441]]|[[2014-11-21 00:34:53.0,25], [2014-11-21 00:47:10.0,110]]|
|[]                   |2015-09-20 23:00:00.0|444|[]                 |[[2014-11-20 23:23:47.0,2]]                              |
|[]                   |2016-01-21 01:00:00.0|555|[]                 |[[2014-11-21 01:01:58.0,135]]                            |
|[]                   |2016-04-23 19:00:00.0|666|[]                 |[[2014-11-23 19:50:51.0,136]]                            |
+---------------------+---------------------+---+-------------------+---------------------------------------------------------+

我可以将模式val dfSc = df.schema打印/读取为:

StructType(StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true), StructField(createHour,StringType,true), StructField(gid,StringType,true), StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true), StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true))

我可以更好地打印出来:

println(df.schema.fields.mkString(",n"))
StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true),
StructField(createHour,StringType,true),
StructField(gid,StringType,true),
StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true),
StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true)

现在,如果我在没有commentsreplies行的情况下读取同一个文件,而val df2 = sqlContext.read. json("./data/partialRevOnly.json")只是删除了这些行,那么printSchema:就会得到这样的结果

root
 |-- comments: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

我不喜欢这样,所以我使用:

val df3 = sqlContext.read.
  schema(dfSc).
  json("./data/partialRevOnly.json")

其中原始模式是CCD_ 8。所以现在我得到了以前删除数据的模式:

root
 |-- comments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comId: string (nullable = true)
 |    |    |-- content: string (nullable = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- repId: string (nullable = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

这太完美了。。。差不多了。我想把这个模式分配给一个类似的变量:

val textSc =  StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true),
    StructField(createHour,StringType,true),
    StructField(gid,StringType,true),
    StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true),
    StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true)

好的-由于双引号和"其他一些结构性"的东西,这将不起作用,所以尝试这个(有错误):

import org.apache.spark.sql.types._
val textSc = StructType(Array(
    StructField("comments",ArrayType(StructType(StructField("comId",StringType,true), StructField("content",StringType,true)),true),true),
    StructField("createHour",StringType,true),
    StructField("gid",StringType,true),
    StructField("replies",ArrayType(StructType(StructField("content",StringType,true), StructField("repId",StringType,true)),true),true),
    StructField("revisions",ArrayType(StructType(StructField("modDate",StringType,true), StructField("revId",StringType,true)),true),true)
))
Name: Compile Error
Message: <console>:78: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField)
           StructField("comments",ArrayType(StructType(StructField("comId",StringType,true), StructField("content",StringType,true)),true),true),

如果没有这个错误(我无法快速解决),我希望使用textSc代替dfSc来读取带有强制模式的JSON数据。

我找不到一种"1对1匹配"的方式来获得(通过println或…)具有可接受语法的架构(有点像上面)。我想一些编码可以通过大小写匹配来消除双引号。然而,我仍然不清楚需要什么规则才能从测试夹具中获得确切的模式,我可以在我的重复生产(相对于测试夹具)代码中简单地重用这些模式。有没有一种方法可以让这个模式完全像我编码的那样打印出来?

注意:这包括双引号和所有正确的StructField/Types等,以便与代码兼容。

作为侧边栏,我曾想过保存一个完整格式的金色JSON文件,以便在Spark作业开始时使用,但我最终希望在适用的结构位置使用日期字段和其他更简洁的类型,而不是字符串。

我如何才能从我的测试工具中获得dataFrame信息(使用带有注释和回复的完整格式JSON输入行),从而将模式作为源代码放入生产代码Scala Spark作业中?

注意:最好的答案是一些编码方式,但一个解释,让我可以跋涉、跋涉、跋涉,跋涉和艰难地完成编码也很有帮助。:)

我最近遇到了这个问题。我使用的是Spark 2.0.2,所以我不知道这个解决方案是否适用于早期版本。

import scala.util.Try
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.types.{DataType, StructType}
/** Produce a Schema string from a Dataset */
def serializeSchema(ds: Dataset[_]): String = ds.schema.json
/** Produce a StructType schema object from a JSON string */
def deserializeSchema(json: String): StructType = {
    Try(DataType.fromJson(json)).getOrElse(LegacyTypeStringParser.parse(json)) match {
        case t: StructType => t
        case _ => throw new RuntimeException(s"Failed parsing StructType: $json")
    }
}

请注意,我刚刚从Spark StructType对象中的私有函数复制了"反序列化"函数。我不知道它在各个版本中的支持程度如何。

好吧,错误消息应该告诉你这里需要知道的一切——StructType需要一系列字段作为参数。因此,在您的情况下,模式应该如下所示:

StructType(Seq(
  StructField("comments", ArrayType(StructType(Seq(       // <- Seq[StructField]
    StructField("comId", StringType, true),
    StructField("content", StringType, true))), true), true), 
  StructField("createHour", StringType, true),
  StructField("gid", StringType, true),
  StructField("replies", ArrayType(StructType(Seq(        // <- Seq[StructField]
    StructField("content", StringType, true),
    StructField("repId", StringType, true))), true), true),
  StructField("revisions", ArrayType(StructType(Seq(      // <- Seq[StructField]
    StructField("modDate", StringType, true),
    StructField("revId", StringType, true))),true), true)))

相关内容

  • 没有找到相关文章

最新更新