我在HDFS上有一个大型数据集(480k +记录(,我想将每条记录拆分为\t。
这是我的代码:
static JavaPairRDD<String, String> load(JavaSparkContext sc, String path) throws Exception {
JavaRDD<String> wholeData = sc.textFile(path);
JavaPairRDD<String, String> wholeDataRDD = wholeData.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) throws Exception {
List<String> line = Arrays.asList(Pattern.compile("t").split(s));
return new Tuple2<String, String>(line.get(0), line.get(1));
}
});
return wholeDataRDD;
}
上面的代码可以工作,但它太慢了(约9分钟(。
感谢您的任何建议
几件事:
-
正如注释中@Bohemian提到的,您应该编写静态字段
static Pattern splitter = Pattern.compile("t");
然后在匿名类中String[] line = splitter.split(s));
-
可能并行化是错误的。这意味着您可能有太多或太少的分区,或者执行程序太多或太少。如评论中所述,太多的执行器可能是一个瓶颈 - 在您的情况下,200 个执行器是非常高的数字。我建议按
RDD.getNumPartitions()
检查分区号 - 可能不超过 200。请记住,每个执行器都是一个新的 JVM,将数据传输到其他 JVM 是非常慢的 IO 操作。如果您不知道应该有多少执行者 - 请不要担心!火花作为动态分配