Spark RDD在CSV文件上功能



我正在尝试通过 sparkContext 加载 CSV 文件,并且加载后,我需要执行任何 RDD操作 CSV文件的必需列上。我能够读取CSV文件并从Javardd获取所需的列。现在,我需要在这些列上执行任何RDD操作。

这是我到目前为止尝试过的。

JavaRDD<String> diskfile = sc.textFile("/Users/hadoop/Downloads/Data_1.csv");
  JavaRDD<Object> newRDD = diskfile.cache().map(lines -> Arrays.asList(new String[]{
          lines.split(",")[0], 
          lines.split(",")[1]
        }
          ));
 System.out.println(newRDD.collect());

newRDD.collect()打印CSV数据的0列和第一列。现在我需要在newRDD上执行任何RDD操作。

预先感谢。

如果您想开始使用Spark RDD转换,则可以转到以下链接:

rdd-basic-escamples

RDD API示例

Word Count使用flatMap, mapToPair, reduceByKey转换中的Java中的示例:

JavaRDD<String> textFile = sc.textFile("hdfs://...");
JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() {
  public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); }
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
  public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); }
});
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});
counts.saveAsTextFile("hdfs://...");

相关内容

  • 没有找到相关文章

最新更新