Spark -从列读取JSON数组



使用Spark 2.11,我有以下数据集(从Cassandra表读取):

+------------+----------------------------------------------------------+
|id         |attributes                                                 |
+------------+----------------------------------------------------------+
|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}]     |
+------------+----------------------------------------------------------+

printSchema():

root
|-- id: string (nullable = true)
|-- attributes: string (nullable = true)

attributes列是一个JSON对象数组。我试着把它变成数据集,但总是失败。我试着这样定义schema:

StructType type = new StructType()
.add("id", new IntegerType(), false)
.add("name", new StringType(), false)
.add("score", new FloatType(), false)
.add("snippets", new IntegerType(), false );

ArrayType schema = new ArrayType(type, false);

提供给from_json如下:

df = df.withColumn("val", functions.from_json(df.col("attributes"), schema));

MatchError:

Exception in thread "main" scala.MatchError: org.apache.spark.sql.types.IntegerType@43756cb (of class org.apache.spark.sql.types.IntegerType)

正确的方法是什么?

可以这样指定模式:

val schema = ArrayType(
StructType(Array(
StructField("id", IntegerType, false),
StructField("name", StringType, false),
StructField("score", FloatType, false),
StructField("snippets", IntegerType, false)
)),
false
)
val df1 = df.withColumn("val", from_json(col("attributes"), schema))
df1.show(false)
//+-----------+------------------------------------------------------+------------------------+
//|id         |attributes                                            |val                     |
//+-----------+------------------------------------------------------+------------------------+
//|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}]|[[1, function, 10.0, 1]]|
//+-----------+------------------------------------------------------+------------------------+

对于Java:

import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createArrayType(createStructType(Arrays.asList(
createStructField("id", IntegerType, false),
createStructField("name", StringType, false),
createStructField("score", FloatType, false),
createStructField("snippets", StringType, false)
)), false);

您可以将模式定义为文字字符串:

val df2 = df.withColumn(
"val",
from_json(
df.col("attributes"),
lit("array<struct<id: int, name: string, score: float, snippets: int>>")
)
)
df2.show(false)
+-----------+------------------------------------------------------+------------------------+
|id         |attributes                                            |val                     |
+-----------+------------------------------------------------------+------------------------+
|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}]|[[1, function, 10.0, 1]]|
+-----------+------------------------------------------------------+------------------------+

如果您喜欢使用模式:

val spark_struct = new StructType()
.add("id", IntegerType, false)
.add("name", StringType, false)
.add("score", FloatType, false)
.add("snippets", IntegerType, false)
val schema = new ArrayType(spark_struct, false)
val df2 = df.withColumn(
"val",
from_json(
df.col("attributes"),
schema
)
)

您的原始代码有两个问题:(1)您使用保留关键字type作为变量名,(2)您不需要在add中使用new

相关内容

  • 没有找到相关文章

最新更新