org.apache.spark.sql.AnalysisException:没有这样的结构字段



我正在使用Java Spark 读取这样的镶木地板文件

Dataset<MyData> myDataDS = sparkSession.read().parquet(myParquetFile)
.as(Encoders.bean(MyData.class));

如果myParquetFile模式与类MyData完全一致,则效果良好。但是,假设我向MyData类添加了一个新字段,例如myId(即使其值为null(,则我需要重新生成镶木地板文件,否则它将引发类似的异常

由:org.apache.spark.sql.AnalysisException引起:没有这样的结构字段myId

有没有一种方法可以跳过空值来通过此错误而不重新生成镶木地板文件?

在读取镶木地板时,默认情况下,Spark使用镶木地板文件中包含的模式来读取数据。例如,与Avro格式相反,模式在镶木地板文件中,如果要更改模式,则必须重新生成镶木地板。

但是,您可以使用方法.schema()将模式提供给Spark的DataFrameReader,而不是让Spark推断模式。在这种情况下,Spark将忽略镶木地板文件中定义的模式,并使用您提供的模式。

因此,解决方案是将从铸造类中提取的模式传递给Spark的DataFrameReader:

Dataset<MyData> myDataDS = sparkSession.read()
.schema(Encoders.bean(MyData.class).schema())
.parquet(myParquetFile)
.as(Encoders.bean(MyData.class))

CCD_ 8没有被抛出;myId";设置为null。

解决这一问题的Brute Force方法-

Dataset<Row> parquet = spark.read()
.parquet(
getClass().getResource("/parquet/plain/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy" +
".parquet").getPath()
);
parquet.show(false);
/**
* +------+
* |price |
* +------+
* |123.15|
* +------+
*/
StructType schema = Encoders.bean(MyData.class).schema();
List<String> columns = Arrays.stream(parquet.columns()).collect(Collectors.toList());
List<Column> columnList = JavaConverters.asJavaCollectionConverter(schema).asJavaCollection().stream()
.map(f -> (columns.contains(f.name())) ? col(f.name()) : lit(null).cast(f.dataType()).as(f.name()))
.collect(Collectors.toList());
Dataset<MyData> myDataDS =
parquet.select(JavaConverters.asScalaBufferConverter(columnList).asScala()).as(Encoders.bean(MyData.class));
myDataDS.show(false);
myDataDS.printSchema();
/**
* +----+------+
* |myId|price |
* +----+------+
* |null|123.15|
* +----+------+
*
* root
*  |-- myId: string (nullable = true)
*  |-- price: decimal(5,2) (nullable = true)
*/

MyData.java


public class MyData {
private double price;
private String myId;
public String getMyId() {
return myId;
}
public void setMyId(String myId) {
this.myId = myId;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}

最新更新