我使用较新的数据集API在Java中实现了Pagerank的示例。当我针对使用较旧的RDD API的示例对代码进行基准测试时,我发现我的代码需要186秒,而基线仅需109秒。是什么导致差异?(旁注:即使数据库仅包含少数条目,Spark也需要数百秒钟?)
我的代码:
Dataset<Row> outLinks = spark.read().jdbc("jdbc:postgresql://127.0.0.1:5432/postgres", "storagepage_outlinks", props);
Dataset<Row> page = spark.read().jdbc("jdbc:postgresql://127.0.0.1:5432/postgres", "pages", props);
outLinks = page.join(outLinks, page.col("id").equalTo(outLinks.col("storagepage_id")));
outLinks = outLinks.distinct().groupBy(outLinks.col("url")).agg(collect_set("outlinks")).cache();
Dataset<Row> ranks = outLinks.map(row -> new Tuple2<>(row.getString(0), 1.0), Encoders.tuple(Encoders.STRING(), Encoders.DOUBLE())).toDF("url", "rank");
for (int i = 0; i < iterations; i++) {
Dataset<Row> joined = outLinks.join(ranks, new Set.Set1<>("url").toSeq());
Dataset<Row> contribs = joined.flatMap(row -> {
List<String> links = row.getList(1);
double rank = row.getDouble(2);
return links
.stream()
.map(s -> new Tuple2<>(s, rank / links.size()))
.collect(Collectors.toList()).iterator();
}, Encoders.tuple(Encoders.STRING(), Encoders.DOUBLE())).toDF("url", "num");
Dataset<Tuple2<String, Double>> reducedByKey =
contribs.groupByKey(r -> r.getString(0), Encoders.STRING())
.mapGroups((s, iterator) -> {
double sum = 0;
while (iterator.hasNext()) {
sum += iterator.next().getDouble(1);
}
return new Tuple2<>(s, sum);
}, Encoders.tuple(Encoders.STRING(), Encoders.DOUBLE()));
ranks = reducedByKey.map(t -> new Tuple2<>(t._1, .15 + t._2 * .85),
Encoders.tuple(Encoders.STRING(), Encoders.DOUBLE())).toDF("url", "rank");
}
ranks.show();
使用RDD的示例代码(适用于我的数据库读取):
Dataset<Row> outLinks = spark.read().jdbc("jdbc:postgresql://127.0.0.1:5432/postgres", "storagepage_outlinks", props);
Dataset<Row> page = spark.read().jdbc("jdbc:postgresql://127.0.0.1:5432/postgres", "pages", props);
outLinks = page.join(outLinks, page.col("id").equalTo(outLinks.col("storagepage_id")));
outLinks = outLinks.distinct().groupBy(outLinks.col("url")).agg(collect_set("outlinks")).cache(); // TODO: play with this cache
JavaPairRDD<String, Iterable<String>> links = outLinks.javaRDD().mapToPair(row -> new Tuple2<>(row.getString(0), row.getList(1)));
// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = links.mapValues(rs -> 1.0);
// Calculates and updates URL ranks continuously using PageRank algorithm.
for (int current = 0; current < 20; current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMapToPair(s -> {
int urlCount = size(s._1());
List<Tuple2<String, Double>> results = new ArrayList<>();
for (String n : s._1) {
results.add(new Tuple2<>(n, s._2() / urlCount));
}
return results.iterator();
});
// Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey((x, y) -> x + y).mapValues(sum -> 0.15 + sum * 0.85);
}
// Collects all URL ranks and dump them to console.
List<Tuple2<String, Double>> output = ranks.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + " has rank: " + tuple._2() + ".");
}
tl; dr 可能是很好的旧避免groupbykey。
很难说,但是您的Dataset
代码等于groupByKey
:
groupByKey(...).mapGroups(...)
这意味着它首先进行洗牌,然后减少数据。
您的RDD使用reduceByKey
-这应该通过应用局部减少来降低洗牌尺寸。如果您希望此代码有些等效,则应使用groupByKey(...).reduceGroups(...)
重写groupByKey(...).mapGroups(...)
。
另一个可能的候选人是配置。spark.sql.shuffle.partitions
的默认值为200,将用于Dataset
聚合。如果
数据库仅包含少数条目?
这是一个严重的过度杀伤。
rdd将根据父数据使用spark.default.parallelism
或值,通常更适度。