如何在JSON中使用read.schema仅指定特定字段:SPARK Scala



我正在尝试以编程方式在看起来像json的文本文件上强制执行schema(json)。我尝试使用 jsonFile,但问题是从 json 文件列表创建数据帧,spark 必须对数据进行 1 次传递才能为数据帧创建架构。因此,它需要解析所有需要更长时间的数据(由于我的数据被压缩且大小为 TB,因此需要 4 小时)。因此,我想尝试将其读取为textFile并强制执行架构以单独获取感兴趣的字段,以便稍后查询生成的数据框。但我不确定如何将其映射到输入。有些人可以给我一些关于如何将架构映射到 json 的参考,比如输入。

输入:

这是完整的架构:

records: org.apache.spark.sql.DataFrame = [country: string, countryFeatures: string, customerId: string, homeCountry: string, homeCountryFeatures: string, places: array<struct<freeTrial:boolean,placeId:string,placeRating:bigint>>, siteName: string, siteId: string, siteTypeId: string, Timestamp: bigint, Timezone: string, countryId: string, pageId: string, homeId: string, pageType: string, model: string, requestId: string, sessionId: string, inputs: array<struct<inputName:string,inputType:string,inputId:string,offerType:string,originalRating:bigint,processed:boolean,rating:bigint,score:double,methodId:string>>] 

但我只对以下几个领域感兴趣:

res45: Array[String] = Array({"requestId":"bnjinmm","siteName":"bueller","pageType":"ad","model":"prepare","inputs":[{"methodId":"436136582","inputType":"US","processed":true,"rating":0,"originalRating":1},{"methodId":"23232322","inputType":"UK","processed":falase,"rating":0,"originalRating":1}]

 val  records = sc.textFile("s3://testData/sample.json.gz")
  val schema = StructType(Array(StructField("requestId",StringType,true),
                          StructField("siteName",StringType,true),
                          StructField("model",StringType,true),
                          StructField("pageType",StringType,true),
                          StructField("inputs", ArrayType(
                                StructType(
                                            StructField("inputType",StringType,true), 
                                            StructField("originalRating",LongType,true), 
                                            StructField("processed",BooleanType,true), 
                                            StructField("rating",LongType,true), 
                                            StructField("methodId",StringType,true)
                                            ),true),true)))
    val rowRDD = ?? 
    val inputRDD = sqlContext.applySchema(rowRDD, schema)
    inputRDD.registerTempTable("input")
     sql("select * from input").foreach(println)

有什么办法可以映射这个吗?还是我需要使用儿子解析器或其他东西。我只想使用文本文件,因为约束。

尝试使用 :

val  records =sqlContext.read.schema(schema).json("s3://testData/test2.gz")

但是不断收到错误:

<console>:37: error: overloaded method value apply with alternatives:
     (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
      (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
      (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
     cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField)
           StructField("inputs",ArrayType(StructType(StructField("inputType",StringType,true), StructField("originalRating",LongType,true), StructField("processed",BooleanType,true), StructField("rating",LongType,true), StructField("score",DoubleType,true), StructField("methodId",StringType,true)),true),true)))
                                              ^

它可以使用以下具有预定义架构的代码加载,Spark不需要遍历ZIP文件中的文件。问题中的代码有歧义。

import org.apache.spark.sql.types._
val input = StructType(
                Array(
                    StructField("inputType",StringType,true), 
                    StructField("originalRating",LongType,true), 
                    StructField("processed",BooleanType,true), 
                    StructField("rating",LongType,true), 
                    StructField("score",DoubleType,true), 
                    StructField("methodId",StringType,true)
                )
            )
 val schema = StructType(Array(
    StructField("requestId",StringType,true),
    StructField("siteName",StringType,true),
    StructField("model",StringType,true),
    StructField("inputs",
        ArrayType(input,true),
                true)
    )
)
val  records =sqlContext.read.schema(schema).json("s3://testData/test2.gz")

并非所有字段都需要提供。虽然如果可能的话,最好提供所有内容。

Spark 最好解析所有行,如果某些行无效。它将_corrupt_record添加为包含整行的列。虽然它是普通的 json 文件文件。

最新更新