在 Spark 中解析缺少的列值 CSV



我有一个巨大的数据库导入表,有~270列,我创建了一个JavaRDD并使用它来填充数据帧。

场景:如果CSV文件中的所有字段都存在,那么一切都很好。但是,如果 CSV 中有一些空白字段,例如。

Value1,,,,,,value7,,,,, 

然后在写入 Hive 表存储的镶木地板时由于索引出界异常(列>行大小)而失败。我不想使用spark-csv库。

我尝试使用过滤器但没有用,因为即使 CSV 中没有数据,我也需要所有列。如果我缺少什么,请告诉我。

JavaRDD<String> tLogRDD =jsc.textFile(dataFile);    
        String schema=tLogRDD.first();
            List<StructField> columns =new ArrayList<StructField>();
            for(String fieldName: schema.split(","))
            {               
            columns.add(DataTypes.createStructField(fieldName,DataTypes.StringType,false));
            }                   
        StructType schemaStructType = DataTypes.createStructType(columns);
        System.out.println("XXXXXXXXXXXX-Row Read Start-XXXXXXXXXXXXXXX");
        @SuppressWarnings("serial")
        JavaRDD<Row> rowRDD = tLogRDD.map(
                  new Function<String, Row>() {
                      @Override
                    public Row call(String record) throws Exception {
                      String[] fields = record.split(",");
                      Object[] fields_converted = fields;
                      return RowFactory.create(fields_converted);                 
                    }
                  });
        //apply schema to rows  
        DataFrame tLogfDataFrame=hContext.createDataFrame(rowRDD, schemaStructType);
        System.out.println("DataFrame Constructed Successfully");
        tLogfDataFrame.show(10);
        tLogfDataFrame.save("C:/Users/Documents/1001.csv","parquet");

你可以使用 csv 阅读器 从 spark,,喜欢:

sparkSession.read()
.format("csv")
.option("header","true")
.option("inferSchema","true")
.load(--file path--)

这更容易,并且有一组选项。

相关内容

  • 没有找到相关文章

最新更新