我有一个巨大的数据库导入表,有~270列,我创建了一个JavaRDD并使用它来填充数据帧。
场景:如果CSV文件中的所有字段都存在,那么一切都很好。但是,如果 CSV 中有一些空白字段,例如。
Value1,,,,,,value7,,,,,
然后在写入 Hive 表存储的镶木地板时由于索引出界异常(列>行大小)而失败。我不想使用spark-csv
库。
我尝试使用过滤器但没有用,因为即使 CSV 中没有数据,我也需要所有列。如果我缺少什么,请告诉我。
JavaRDD<String> tLogRDD =jsc.textFile(dataFile);
String schema=tLogRDD.first();
List<StructField> columns =new ArrayList<StructField>();
for(String fieldName: schema.split(","))
{
columns.add(DataTypes.createStructField(fieldName,DataTypes.StringType,false));
}
StructType schemaStructType = DataTypes.createStructType(columns);
System.out.println("XXXXXXXXXXXX-Row Read Start-XXXXXXXXXXXXXXX");
@SuppressWarnings("serial")
JavaRDD<Row> rowRDD = tLogRDD.map(
new Function<String, Row>() {
@Override
public Row call(String record) throws Exception {
String[] fields = record.split(",");
Object[] fields_converted = fields;
return RowFactory.create(fields_converted);
}
});
//apply schema to rows
DataFrame tLogfDataFrame=hContext.createDataFrame(rowRDD, schemaStructType);
System.out.println("DataFrame Constructed Successfully");
tLogfDataFrame.show(10);
tLogfDataFrame.save("C:/Users/Documents/1001.csv","parquet");
你可以使用 csv 阅读器 从 spark,,喜欢:
sparkSession.read()
.format("csv")
.option("header","true")
.option("inferSchema","true")
.load(--file path--)
这更容易,并且有一组选项。