apache spark -排序RDD元素



对于一个研究项目,我尝试对RDD中的元素进行排序。我用了两种不同的方法。

在第一种方法中,我在RDD上应用了mapPartitions()函数,以便它对RDD的内容进行排序,并提供一个结果RDD,其中包含排序列表作为RDD中的唯一记录。然后,我应用了一个reduce函数,它基本上合并了排序列表。

我在包含30个节点的EC2集群上运行这些实验。我使用spark ec2脚本设置它。数据文件存储在HDFS。

在第二种方法中,我使用了Spark中的sortBy方法。

我在这里找到的美国人口普查数据(100MB)上执行了这些操作

一行像这样

9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child <18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 50000.

我根据CSV中的第25个值进行排序。这一行是1758.14。

我注意到sortBy的性能比其他方法差。这是预期的情景吗?如果是的话,为什么mapPartitions()和reduce()不是默认的排序方法呢?

这是我的实现

public static void sortBy(JavaSparkContext sc){
        JavaRDD<String> rdd = sc.textFile("/data.txt",32);
        long start = System.currentTimeMillis();
        rdd.sortBy(new Function<String, Double>(){
            @Override
                public Double call(String v1) throws Exception {
                      // TODO Auto-generated method stub
                  String [] arr = v1.split(",");
                  return Double.parseDouble(arr[24]);   
                }
        }, true, 9).collect();
        long end = System.currentTimeMillis();
        System.out.println("SortBy: " + (end - start));
  }
public static void sortList(JavaSparkContext sc){
        JavaRDD<String> rdd = sc.textFile("/data.txt",32); //parallelize(l, 8);
        long start = System.currentTimeMillis();
        JavaRDD<LinkedList<Tuple2<Double, String>>> rdd3 = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, LinkedList<Tuple2<Double, String>>>(){
        @Override
        public Iterable<LinkedList<Tuple2<Double, String>>> call(Iterator<String> t)
            throws Exception {
          // TODO Auto-generated method stub
          LinkedList<Tuple2<Double, String>> lines = new LinkedList<Tuple2<Double, String>>();
          while(t.hasNext()){       
            String s = t.next();
            String arr1[] = s.split(",");
            Tuple2<Double, String> t1 = new Tuple2<Double, String>(Double.parseDouble(arr1[24]),s);
            lines.add(t1);
          }
          Collections.sort(lines, new IncomeComparator());
          LinkedList<LinkedList<Tuple2<Double, String>>> list = new LinkedList<LinkedList<Tuple2<Double, String>>>();
          list.add(lines);
          return list;
        }
        });
        rdd3.reduce(new Function2<LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>>(){
        @Override
        public LinkedList<Tuple2<Double, String>> call(
                LinkedList<Tuple2<Double, String>> a,
                LinkedList<Tuple2<Double, String>> b) throws Exception {
          // TODO Auto-generated method stub
          LinkedList<Tuple2<Double, String>> result = new LinkedList<Tuple2<Double, String>>();
          while (a.size() > 0 && b.size() > 0) {
            if (a.getFirst()._1.compareTo(b.getFirst()._1) <= 0)
              result.add(a.poll());
            else
              result.add(b.poll());
          }
          while (a.size() > 0)
            result.add(a.poll());
          while (b.size() > 0)
            result.add(b.poll());
          return result;
        }
        });     
        long end = System.currentTimeMillis();
        System.out.println("MapPartitions: " + (end - start));
  }

Collect()是一个主要的瓶颈,因为它将所有结果返回给驱动程序。
它产生IO hit &额外的网络流量到单个源(在本例中是驱动程序)。它还会阻碍其他操作。

代替第一个sortBy()代码段中的collect(),尝试执行并行操作,如saveAsTextFile(tmp),而不是使用sc.textFile(tmp)回读。

另一个sortBy()代码段利用mapPartitions()和reduce()并行api -所以整个工作是并行完成的。这似乎是端到端性能时间差异的原因。

请注意,您的发现并不一定意味着所有机器的执行时间总和更差。

最新更新