Spark groupBy vs repartition plus mapPartitions



我的数据集是~20百万行,需要~8 GB的RAM。我正在使用 2 个执行器运行我的作业,每个执行器 10 GB RAM,每个执行器 2 个内核。由于进一步的转换,应一次性缓存所有数据。

我需要根据 4 个字段减少重复项(选择任何重复项)。两个选项:使用groupBy和使用repartitionmapPartitions。第二种方法允许您指定分区数,并且在某些情况下可以因此执行得更快,对吗?

您能否解释一下哪个选项具有更好的性能?两个选项的 RAM 消耗是否相同?

使用groupBy

dataSet
    .groupBy(col1, col2, col3, col4)
    .agg(
        last(col5),
        ...
        last(col17)
    );

使用repartitionmapPartitions

dataSet.sqlContext().createDataFrame(
    dataSet
        .repartition(parallelism, seq(asList(col1, col2, col3, col4)))
        .toJavaRDD()
        .mapPartitions(DatasetOps::reduce),
    SCHEMA
);
private static Iterator<Row> reduce(Iterator<Row> itr) {
    Comparator<Row> comparator = (row1, row2) -> Comparator
        .comparing((Row r) -> r.getAs(name(col1)))
        .thenComparing((Row r) -> r.getAs(name(col2)))
        .thenComparingInt((Row r) -> r.getAs(name(col3)))
        .thenComparingInt((Row r) -> r.getAs(name(col4)))
        .compare(row1, row2);
    List<Row> list = StreamSupport
        .stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.ORDERED), false)
        .collect(collectingAndThen(toCollection(() -> new TreeSet<>(comparator)), ArrayList::new));
    return list.iterator();
}

第二种方法允许您指定分区数,并且在某些情况下可以因此执行得更快,对吗?

没有。这两种方法都允许您指定分区数 - 在第一种情况下到spark.sql.shuffle.partitions

spark.conf.set("spark.sql.shuffle.partitions", parallelism)

但是,如果重复项很常见,则第二种方法本质上效率较低,因为它首先洗牌,然后减少,跳过映射端缩减(换句话说,它是另一个按键分组)。如果重复很少见,但这不会有太大区别。

附带说明一下,Dataset已经提供了dropDuplicates变体,这些变体采用一组列,first/last在这里不是特别有意义(请参阅如何选择每个组的第一行中的讨论?

相关内容

  • 没有找到相关文章

最新更新