我正在阅读来自kafka的流,然后我从kafka(json)中转换为结构。
from_json
的变体可以采用类型String
的模式,但找不到样本。请告知以下代码中有什么问题。
错误
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '(' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT',
== SQL ==
STRUCT ( `firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY ( STRUCT ( `city`: STRING, `state`: STRING, `zip`: STRING ) ) )
-------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)
程序
public static void main(String[] args) throws AnalysisException {
String master = "local[*]";
String brokers = "quickstart:9092";
String topics = "simple_topic_6";
SparkSession sparkSession = SparkSession
.builder().appName(EmployeeSchemaLoader.class.getName())
.master(master).getOrCreate();
String employeeSchema = "STRUCT ( firstName: STRING, lastName: STRING, email: STRING, " +
"addresses: ARRAY ( STRUCT ( city: STRING, state: STRING, zip: STRING ) ) ) ";
SparkContext context = sparkSession.sparkContext();
context.setLogLevel("ERROR");
SQLContext sqlCtx = sparkSession.sqlContext();
Dataset<Row> employeeDataset = sparkSession.readStream().
format("kafka").
option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics).load();
employeeDataset.printSchema();
employeeDataset = employeeDataset.withColumn("strValue", employeeDataset.col("value").cast("string"));
employeeDataset = employeeDataset.withColumn("employeeRecord",
functions.from_json(employeeDataset.col("strValue"),employeeSchema, new HashMap<>()));
employeeDataset.printSchema();
employeeDataset.createOrReplaceTempView("employeeView");
sparkSession.catalog().listTables().show();
sqlCtx.sql("select * from employeeView").show();
}
您的问题帮助我发现from_json
的变体具有基于String
的模式,仅在Java中可用,最近在即将到来的2.3.0中添加了Scala的API。我很久以来一直坚信Scara的Spark API始终是最丰富的功能,您的问题帮助我学习了它不应该在2.3.0(!)(!)
回到您的问题,您可以以JSON或DDL格式定义基于字符串的模式。
手工编写JSON可能有点麻烦,所以我会采取不同的方法(鉴于我是Scala开发人员非常容易)。
让我们首先使用Scara的Spark API定义模式。
import org.apache.spark.sql.types._
val addressesSchema = new StructType()
.add($"city".string)
.add($"state".string)
.add($"zip".string)
val schema = new StructType()
.add($"firstName".string)
.add($"lastName".string)
.add($"email".string)
.add($"addresses".array(addressesSchema))
scala> schema.printTreeString
root
|-- firstName: string (nullable = true)
|-- lastName: string (nullable = true)
|-- email: string (nullable = true)
|-- addresses: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- city: string (nullable = true)
| | |-- state: string (nullable = true)
| | |-- zip: string (nullable = true)
似乎与您的模式相匹配,不是吗?
使用该架构转换为JSON编码的字符串,用json
方法轻而易举。
val schemaAsJson = schema.json
schemaAsJson
正是您的JSON字符串,看起来很漂亮...嗯...复杂。出于显示目的,我宁愿使用prettyJson
方法。
scala> println(schema.prettyJson)
{
"type" : "struct",
"fields" : [ {
"name" : "firstName",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "lastName",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "email",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "addresses",
"type" : {
"type" : "array",
"elementType" : {
"type" : "struct",
"fields" : [ {
"name" : "city",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "state",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "zip",
"type" : "string",
"nullable" : true,
"metadata" : { }
} ]
},
"containsNull" : true
},
"nullable" : true,
"metadata" : { }
} ]
}
那是您在JSON中的模式。
您可以使用DataType
和"验证"JSON String(使用Spark在from_json
的封面下使用的Datatype.fromjson)。
import org.apache.spark.sql.types.DataType
val dt = DataType.fromJson(schemaAsJson)
scala> println(dt.sql)
STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>>
一切都很好。请注意,如果我使用示例数据集检查一下?
val rawJsons = Seq("""
{
"firstName" : "Jacek",
"lastName" : "Laskowski",
"email" : "jacek@japila.pl",
"addresses" : [
{
"city" : "Warsaw",
"state" : "N/A",
"zip" : "02-791"
}
]
}
""").toDF("rawjson")
val people = rawJsons
.select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
.select("json.*") // <-- flatten the struct field
.withColumn("address", explode($"addresses")) // <-- explode the array field
.drop("addresses") // <-- no longer needed
.select("firstName", "lastName", "email", "address.*") // <-- flatten the struct field
scala> people.show
+---------+---------+---------------+------+-----+------+
|firstName| lastName| email| city|state| zip|
+---------+---------+---------------+------+-----+------+
| Jacek|Laskowski|jacek@japila.pl|Warsaw| N/A|02-791|
+---------+---------+---------------+------+-----+------+