Spark SQL读取了一个已经转义了双引号的JSON文件



我有一个简单的Spark程序,它可以读取JSON文件并发出CSV文件。在 JSON 文件中,数据被转义为双引号。Spark 程序无法将该行读取为有效的 JSON 字符串。

input.json

{"key" : "k1", "value1": "Good String", "value2": "Good String"}

input_1.json

"{"key" : "k1", "value1": "Good String", "value2": "Good String"}"

输出.csv - 数据作为损坏的记录返回

_corrupt_record,key,value1,value2
"{\"key\" : \"k1\", \"value1\": \"Good String\", \"value2\": \"Good String\"}",,,

意料之中.csv

,k1,Good String,Good String

请查看下面的主要代码并建议

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder()
            .appName(TestSpark.class.getName()).master("local[1]").getOrCreate();
    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");
    SQLContext sqlCtx = sparkSession.sqlContext();
    List<StructField> kvFields = new ArrayList<>();
    kvFields.add(DataTypes.createStructField("_corrupt_record", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("key", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("value1", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("value2", DataTypes.StringType, true));
    StructType employeeSchema = DataTypes.createStructType(kvFields);
    Dataset<Row> dataset = sparkSession.read()
                    .option("inferSchema", false)
                    .format("json")
                    .schema(employeeSchema)
                    .load("D:\dev\workspace\java\simple-kafka\key_value.json");
    dataset.createOrReplaceTempView("sourceView");
    sqlCtx.sql("select * from sourceView")
            .write()
            .option("header", true)
            .format("csv")
            .save("D:\dev\workspace\java\simple-kafka\output\" + UUID.randomUUID().toString());
    sparkSession.close();
}

您需要将其读取为文本并手动解析。我不将Spark与Java一起使用,但这里是Scala等效项,您可以将其用作伪代码:

val rdd: RDD[MyClass] = sc.textFile(path)
  .map { line =>
    val json = ... // turn line into valid json
    Try(parse(json))
      .recover {
        case NonFatal(ex) => // handle parse error
      }
      .map { jvalue =>
        // Convert the JSON into a case class
      }
      .get
  }
val ds: Dataset[MyClass] = spark.createDataset(rdd)

相关内容

  • 没有找到相关文章

最新更新