如何将from_json与架构一起使用为字符串(即JSON编码的架构)



我正在阅读来自kafka的流,然后我从kafka(json)中转换为结构。

from_json的变体可以采用类型String的模式,但找不到样本。请告知以下代码中有什么问题。

错误

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input '(' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT',
== SQL ==
STRUCT ( `firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY ( STRUCT ( `city`: STRING, `state`: STRING, `zip`: STRING )  )  ) 
-------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)

程序

public static void main(String[] args) throws AnalysisException {
    String master = "local[*]";
    String brokers = "quickstart:9092";
    String topics = "simple_topic_6";
    SparkSession sparkSession = SparkSession
            .builder().appName(EmployeeSchemaLoader.class.getName())
            .master(master).getOrCreate();
   String employeeSchema = "STRUCT ( firstName: STRING, lastName: STRING, email: STRING, " +
            "addresses: ARRAY ( STRUCT ( city: STRING, state: STRING, zip: STRING )  )  ) ";
    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");
    SQLContext sqlCtx = sparkSession.sqlContext();
    Dataset<Row> employeeDataset = sparkSession.readStream().
            format("kafka").
            option("kafka.bootstrap.servers", brokers)
            .option("subscribe", topics).load();
    employeeDataset.printSchema();
    employeeDataset = employeeDataset.withColumn("strValue", employeeDataset.col("value").cast("string"));
    employeeDataset = employeeDataset.withColumn("employeeRecord",
            functions.from_json(employeeDataset.col("strValue"),employeeSchema, new HashMap<>()));
    employeeDataset.printSchema();
    employeeDataset.createOrReplaceTempView("employeeView");
    sparkSession.catalog().listTables().show();
    sqlCtx.sql("select * from employeeView").show();
}

您的问题帮助我发现from_json的变体具有基于String的模式,仅在Java中可用,最近在即将到来的2.3.0中添加了Scala的API。我很久以来一直坚信Scara的Spark API始终是最丰富的功能,您的问题帮助我学习了它不应该在2.3.0(!)(!)

的变化之前就这样做。

回到您的问题,您可以以JSON或DDL格式定义基于字符串的模式。

手工编写JSON可能有点麻烦,所以我会采取不同的方法(鉴于我是Scala开发人员非常容易)。

让我们首先使用Scara的Spark API定义模式。

import org.apache.spark.sql.types._
val addressesSchema = new StructType()
  .add($"city".string)
  .add($"state".string)
  .add($"zip".string)
val schema = new StructType()
  .add($"firstName".string)
  .add($"lastName".string)
  .add($"email".string)
  .add($"addresses".array(addressesSchema))
scala> schema.printTreeString
root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- email: string (nullable = true)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- zip: string (nullable = true)

似乎与您的模式相匹配,不是吗?

使用该架构转换为JSON编码的字符串,用json方法轻而易举。

val schemaAsJson = schema.json

schemaAsJson正是您的JSON字符串,看起来很漂亮...嗯...复杂。出于显示目的,我宁愿使用prettyJson方法。

scala> println(schema.prettyJson)
{
  "type" : "struct",
  "fields" : [ {
    "name" : "firstName",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "lastName",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "email",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "addresses",
    "type" : {
      "type" : "array",
      "elementType" : {
        "type" : "struct",
        "fields" : [ {
          "name" : "city",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "state",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "zip",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        } ]
      },
      "containsNull" : true
    },
    "nullable" : true,
    "metadata" : { }
  } ]
}

那是您在JSON中的模式。

您可以使用DataType和"验证"JSON String(使用Spark在from_json的封面下使用的Datatype.fromjson)。

import org.apache.spark.sql.types.DataType
val dt = DataType.fromJson(schemaAsJson)
scala> println(dt.sql)
STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>>

一切都很好。请注意,如果我使用示例数据集检查一下?

val rawJsons = Seq("""
  {
    "firstName" : "Jacek",
    "lastName" : "Laskowski",
    "email" : "jacek@japila.pl",
    "addresses" : [
      {
        "city" : "Warsaw",
        "state" : "N/A",
        "zip" : "02-791"
      }
    ]
  }
""").toDF("rawjson")
val people = rawJsons
  .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
  .select("json.*") // <-- flatten the struct field
  .withColumn("address", explode($"addresses")) // <-- explode the array field
  .drop("addresses")  // <-- no longer needed
  .select("firstName", "lastName", "email", "address.*") // <-- flatten the struct field
scala> people.show
+---------+---------+---------------+------+-----+------+
|firstName| lastName|          email|  city|state|   zip|
+---------+---------+---------------+------+-----+------+
|    Jacek|Laskowski|jacek@japila.pl|Warsaw|  N/A|02-791|
+---------+---------+---------------+------+-----+------+

相关内容

  • 没有找到相关文章

最新更新