如何在流查询(Java)中使用JSON数组作为Kafka记录?



我看了很多例子来读取 Kafa 主题中的 JSON 数据。如果我从每个连接的主题中读取一条记录,我能够成功做到这一点,例如:

{"customer_id": "8d267162-1478-11ea-8d71-362b9e155667",
"product": "Super widget",
"price": 10,
"bought_date": "2019-01-01"
}

下面的代码适用于上述用例:

package io.examle;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class Stackoverflow {
public static void main(String[] args) throws StreamingQueryException {
StructType schema = new StructType(new StructField[]{
new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
});
SparkSession spark = SparkSession
.builder()
.appName("SimpleExample")
.getOrCreate();
// Create a DataSet representing the stream of input lines from Kafka
Dataset<Row> dataset = spark
.readStream()
.format("kafka")                
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "utilization")
.load()
.selectExpr("CAST(value AS STRING) as json");
dataset.printSchema();
Column col = new Column("json");
Dataset<Row> customers = dataset.select(functions.from_json(col,schema).as("data")).select("data.*");           
customers.printSchema();
customers.writeStream()      
.format("console")
.start()
.awaitTermination();
}
}

但在我看来,上述内容效率低下,即连接到 Kafa 以获得每个连接的单个记录。因此,在我看来,传递下面表单的 JSON 数组会更有效。因为每个 json 数组可以为其存储许多"记录"。

[{
"customer_id": "8d267162-1478-11ea-8d71-362b9e155667",
"product": "Super widget",
"price": 10,
"bought_date": "2019-01-01"
},
{
"customer_id": "498443a2-1479-11ea-8d71-362b9e155667",
"product": "Food widget",
"price": 4,
"bought_date": "2019-01-01"
} 
]

问题是我无法解压缩 JSON 数组并处理它。下面的代码失败:

package io.example;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class Stackoverflow {
public static void main(String[] args) throws StreamingQueryException {
StructType schema = new StructType(new StructField[]{
new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
});
SparkSession spark = SparkSession
.builder()
.appName("SimpleExample")
.getOrCreate();
// Create a DataSet representing the stream of input lines from Kafka
Dataset<Row> dataset = spark
.readStream()
.format("kafka")                
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "utilization")
.load()
.selectExpr("CAST(value AS STRING) as json");
dataset.printSchema();
Column col = new Column("json");
Dataset<Row> customers = dataset.select(functions.from_json(col,schema).as("data"));            

Dataset<Row> data = customers.select(functions.explode_outer(functions.explode_outer(new Column("data"))));
data.printSchema();
data.writeStream()      
.format("console")
.start()
.awaitTermination();
}
}


Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`data`)' due to data type mismatch: input to function explode should be array or map type, not struct<customer_id:string,product:string,price:int,bought_date:string>;;

问题:

1( 如何正确编写代码来有效地解压缩 JSON 数组? 我怀疑我上面对失败的代码采取的方法是否是最好的,但我试图遵循我看到的关于functions.explode((等的许多示例。

2(如果失败的代码奇迹般地是正确的方法。如何将结构转换为数组或映射?

Spark 不会为每个连接拉取一条记录。Kafka API 将一次轮询一批记录。

就Kafka中的最佳实践而言,多个事件应该拆分为多个对象,而不是填充到数组中,除非它们实际上需要关联,例如,您将有一个"购物车"记录,其中包含订单的"项目"列表

要使代码正常工作,架构必须是 ArrayType(而不是结构或映射(。

StructType schema = new StructType(new StructField[]{
new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
});
ArrayType arrSchema = new ArrayType(schema, false);

然后在使用from_json时使用数组架构。

为了完整起见,下面的代码使用上面的建议实现了预期的结果:

package io.example;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class Stackoverflow {
public static void main(String[] args) throws StreamingQueryException {
StructType schema = new StructType(new StructField[]{
new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
new StructField("bought_date", DataTypes.StringType, false, Metadata.empty())
});
ArrayType  arrayType = new ArrayType(schema, false);
SparkSession spark = SparkSession
.builder()
.appName("SimpleExample")
.getOrCreate();
// Create a DataSet representing the stream of input lines from Kafka
Dataset<Row> dataset = spark
.readStream()
.format("kafka")                
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "utilization")
.load()
.selectExpr("CAST(value AS STRING) as json");
dataset.printSchema();
Column col = new Column("json");        
Column data = functions.from_json(col,arrayType).as("data");    
Column explode = functions.explode(data);
Dataset<Row> customers = dataset.select(explode).select("col.*");
customers.schema();
customers.writeStream()      
.format("console")
.start()
.awaitTermination();

}
}

Batch: 77
-------------------------------------------
+-----------+-------------+-----+-----------+
|customer_id|      product|price|bought_date|
+-----------+-------------+-----+-----------+
|   d6315a00| Super widget|   10|2019-01-01 |
|   d6315cd0|  Food widget|    4| 2019-01-01|
|   d6315e2e|  Bike widget|   10| 2019-01-01|
|   d631614e|Garage widget|    4| 2019-01-01|
+-----------+-------------+-----+-----------+

最新更新