如何一次处理几个Javardds



我有一个格式的大数据集csv,我需要在此数据集上执行一些RDD操作,而无需使用任何数据范围/数据集API和SparkSQL.通过实现此目标,我加载了每一列数据列进入单独的Javardd。

这是我的示例数据集:

id    name    address   rank
1001  john    NY        68
1002  kevin   NZ        72
1003  steve   WA        64

这是我到目前为止尝试的代码:

JavaRDD<String> diskfile = sc.textFile("/Users/hadoop/Downloads/a.csv");
JavaRDD<String> idRDD=diskfile.flatMap(line -> Arrays.asList(line.split(",")[0]));
JavaRDD<String> nameRDD=diskfile.flatMap(line -> Arrays.asList(line.split(",")[1]));
JavaRDD<String> addressRDD=diskfile.flatMap(line -> Arrays.asList(line.split(",")[2]));

之后,我在addressRDDnameRDD上都应用了reduceByKey

JavaPairRDD<String,Integer> addresspair=address.mapToPair( t -> new Tuple2 <String,Integer>(t,1)).reduceByKey((x, y) -> x + y);
JavaPairRDD<String,Integer> namepair=nameRDD.mapToPair( t -> new Tuple2 <String,Integer>(t,1)).reduceByKey((x, y) -> x + y);

问题:

i在地址pair上应用了sorybyvale(交换密钥值),并获取一个地址值(result),这是最高次数的次数。现在,我需要返回包含地址字段的result

的CSV文件的所有必需列

您可以使用filter如下。

JavaRDD<String> filteredData = diskfile.filter(add -> add.contains(result));
filteredData.foreach(data -> {
            System.out.println(data);
        });

相关内容

  • 没有找到相关文章

最新更新