我正在尝试这样做:
private final String charset8859 = "ISO-8859-1";
private final String charsetUtf8 = "UTF-8";
private String partnerFile8859 = "src/test/resources/D10410.QUALSCSV";
public SparkSession getOrCreateSparkSession(){
SparkConf conf = new SparkConf().setAppName("SparkSample").setMaster("local[*]");
SparkSession sparkSession = SparkSession
.builder()
.config(conf)
.getOrCreate();
return sparkSession;
}
public void withCharset2(JavaSparkContext context, String location, String charset) throws UnsupportedEncodingException
{
if (Charset.forName(charset) == DEFAULT_CHARSET) {
JavaRDD<String> result = context.textFile(location,1);
} else {
//val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
////return new String(pair._2.getBytes(), 0, pair._2.getLength(), charset);
// can't pass a Charset object here cause its not serializable
// TODO: maybe use mapPartitions instead?
JavaPairRDD<LongWritable,Text> rdd = context.hadoopFile(location, TextInputFormat.class, LongWritable.class, Text.class);
rdd.map(pair -> {
String s = new String(pair._2.getBytes(), 0, pair._2.getLength(), charset);
return new String(pair._2.getBytes(), 0, pair._2.getLength(), charset);
});
rdd.collect();
}
}
@Test
public void getTextWithCharset() throws UnsupportedEncodingException, FileNotFoundException {
SparkSession sparkSession = getOrCreateSparkSession();
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
withCharset2(sparkContext, partnerFile8859, charset8859);
}
但是抛出了这个错误:
错误TaskSetManager:阶段0.0(TID 1(中的任务1.0具有不可序列化的结果:org.apache.hadoop.io.LongWritable序列化堆栈:-对象不可序列化(类:org.apache.hadoop.io.LongWritable,值:1166(-字段(类:scala.Triple2,名称:_1,类型:类java.lang.Object(-对象(类scala.Triple2,(1166,"53";"S?cio sem Capital"((-数组的元素(索引:0(-数组(类[Lscala.Tuple2;,大小44(;不重试
我正试图从https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/util/TextFile.scala到Java Spark API
我这样做解决了这个问题:
public void withCharset2(JavaSparkContext context, String location, String charset) throws UnsupportedEncodingException
{
if (Charset.forName(charset) == DEFAULT_CHARSET) {
JavaRDD<String> result = context.textFile(location,1);
} else {
//val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
////return new String(pair._2.getBytes(), 0, pair._2.getLength(), charset);
// can't pass a Charset object here cause its not serializable
// TODO: maybe use mapPartitions instead?
JavaPairRDD<LongWritable,Text> rdd = context.hadoopFile(location, TextInputFormat.class, LongWritable.class, Text.class);
JavaRDD<Text> values = rdd.values();
JavaRDD<String> textRDD = values.map(text -> {
String s = new String(text.getBytes(), 0, text.getLength(), charset);
System.out.printf("textRDD map: %s | orignal string: %s n",s, text.toString());
//textRDD map: "08";"Conselheiro de Administração" | orignal string: "08";"Conselheiro de Administra??o"
return s;
});
textRDD.collect();
}
}