在Spark Java(local
(中运行以下代码时,我收到错误:
at Datahub.run(Datahub.java:96)
at Datahub.main(Datahub.java:64)
***Caused by: java.lang.IllegalArgumentException: object is not an instance of declaring class***
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
读取 csv 并将其保存为镶木地板格式的逻辑。
public class Datahub implements Serializable{
@SuppressWarnings("serial")
private transient SparkConf sparConf;
private transient JavaSparkContext sparkContext;
private transient SQLContext SQLContext;
public Datahub(){
sparConf = new SparkConf().setAppName("Datahub").setMaster("local");
sparkContext = new JavaSparkContext(sparConf);
SQLContext = new SQLContext(sparkContext);
System.setProperty("hadoop.home.dir", "C:/tools/spark");
}
public static void main(String[] args) throws Exception {
Datahub job = new Datahub();
job.run("a","b");
}
public void run(String t, String u)
{
JavaRDD<String> pairRDD = sparkContext.textFile("C:/temp/L1_result.csv");
JavaPairRDD<String,String> rowJavaRDD = pairRDD.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String,String> call(String rec) {
String[] tokens = rec.split(";");
String[] vals = new String[tokens.length];
for(int i= 0; i < tokens.length; i++){
vals[i] =tokens[i];
}
return new Tuple2<String, String>(tokens[0], tokens[1]);
} });
Dataset<Row> fundDF = SQLContext.createDataFrame(rowJavaRDD.values(), funds.class);
fundDF.printSchema();
fundDF.show();
fundDF.write().option("mergeschema", true).parquet("C:/test");
}
}
通过以下更改解决:
funds b0 = new funds(); b0.setK("k0"); b0.setSomething("sth0");
funds b1 = new funds(); b1.setK("k1"); b1.setSomething("sth1");
List<funds> data = new ArrayList<funds>();
data.add(b0); data.add(b1);
Dataset<Row> fundDf = SQLContext.createDataFrame(data, funds.class);
fundDf.printSchema();
fundDf.write().option("mergeschema", true).parquet("C:/test");
您可以读取该文件并将其转换为豆类资金,如下所示:
Dataset<funds> fundsDF = SQLContext.read().csv("C:/temp/L1_result.csv").as(Encoders.bean(funds.class));