我正在尝试通过 sparkContext 加载 CSV 文件,并且加载后,我需要执行任何 RDD操作在 CSV文件的必需列上。我能够读取CSV文件并从Javardd获取所需的列。现在,我需要在这些列上执行任何RDD操作。
这是我到目前为止尝试过的。
JavaRDD<String> diskfile = sc.textFile("/Users/hadoop/Downloads/Data_1.csv");
JavaRDD<Object> newRDD = diskfile.cache().map(lines -> Arrays.asList(new String[]{
lines.split(",")[0],
lines.split(",")[1]
}
));
System.out.println(newRDD.collect());
newRDD.collect()
打印CSV数据的0列和第一列。现在我需要在newRDD
上执行任何RDD操作。
预先感谢。
如果您想开始使用Spark RDD转换,则可以转到以下链接:
rdd-basic-escamples
RDD API示例
Word Count使用flatMap, mapToPair, reduceByKey
转换中的Java中的示例:
JavaRDD<String> textFile = sc.textFile("hdfs://...");
JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); }
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); }
});
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
counts.saveAsTextFile("hdfs://...");