如何在Spark SQL Java中转换CSV类型字符串到数据框架?



我用Spark结构化流API编写Spark Java客户端代码。这些代码从Kafka中提取CSV类型字符串

SparkSession spark = SparkSession.builder().master("local[*]").appName("KafkaMongoStream").getOrCreate();

Dataset<Row> df = spark.read().format("kafka").option("kafka.bootstrap.servers", "localhost:9092"))
.option("subscribe", "topicForMongoDB")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)");

df.show();

返回结果成功。这些代码打印CSV类型字符串。

+--------------------+
|               value|
+--------------------+
|realtime_start,re...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|

然后我尝试将这些字符串转换为Spark SQL中的Spark数据框架。首先,下面的代码是Java POJO类

public class EntityMongoDB implements Serializable {
private Date date;
private float value;
private String id;
private String title;
private String state;
private String frequency_short;
private String units_short;
private String seasonal_adjustment_short;

private static StructType structType = DataTypes.createStructType(new StructField[] {

DataTypes.createStructField("date", DataTypes.DateType, false),
DataTypes.createStructField("value", DataTypes.FloatType, false),
DataTypes.createStructField("id", DataTypes.StringType, false),
DataTypes.createStructField("title", DataTypes.StringType, false),
DataTypes.createStructField("state", DataTypes.StringType, false),
DataTypes.createStructField("frequency_short", DataTypes.StringType, false),
DataTypes.createStructField("units_short", DataTypes.StringType, false),
DataTypes.createStructField("seasonal_adjustment_short", DataTypes.StringType, false)
});

public static StructType getStructType() {
return structType;
}
}

然后编写代码将CSV类型字符串转换为dataframe

Dataset<Row> dfs = df.select(from_json(col("value"), EntityMongoDB.getStructType())
.as("entityMongoDB"))
.selectExpr("entityMongoDB.date", "entityMongoDB.value", "entityMongoDB.id", 
"entityMongoDB.title", "entityMongoDB.state", "entityMongoDB.frequency_short", 
"entityMongoDB.units_short", "entityMongoDB.seasonal_adjustment_short").toDF();
dfs.show();
dfs.printSchema();

打印的模式是正确的。

|-- date: date (nullable = true)
|-- value: float (nullable = true)
|-- id: string (nullable = true)
|-- title: string (nullable = true)
|-- state: string (nullable = true)
|-- frequency_short: string (nullable = true)
|-- units_short: string (nullable = true)
|-- seasonal_adjustment_short: string (nullable = true)

但是生成的列全是空值:

+----+-----+----+-----+-----+---------------+-----------+-------------------------+
|date|value|  id|title|state|frequency_short|units_short|seasonal_adjustment_short|
+----+-----+----+-----+-----+---------------+-----------+-------------------------+
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|

我认为数据框架的模式生成是正确的,但是提取数据部分有一些问题。

您在value列中的字符串不是有效的JSON,因此from_json在这里不起作用。

对于Spark 3+,您可以使用@mck在评论中指出的from_csv:

Dataset<Row> dfs = df.select(from_csv(col("value"), EntityMongoDB.getStructType())
.as("entityMongoDB"))
.selectExpr("entityMongoDB.*").toDF(); 

对于3之前的Spark版本,您可以通过逗号将值split,然后将结果数组转换为多个列:

Dataset<Row> dfs = df.select(split(col("value"), ",").as("values"))
.select(IntStream.range(0, 7).map(i -> col("values").getItem(i)).toArray())
.toDF("date", "value", "id", "title", "state", "frequency_short", "units_short", "seasonal_adjustment_short"); 

此外,似乎您的值中有列名,您可以过滤掉那一行。

相关内容

  • 没有找到相关文章

最新更新