我使用映射步骤创建了一个包含我需要的一些对象的JavaRDD对象。基于这些对象,我想创建一个包含一些统计信息的全局哈希图,但我无法弄清楚要使用哪个RDD操作。起初我以为reduce是解决方案,但后来我发现你必须返回相同类型的对象。我对减少项目不感兴趣,而是对收集所有机器的所有统计数据感兴趣(它们可以单独计算,然后up_添加。
例如:我有一个包含整数数组等对象的RDD,我想通过将每个整数放入哈希表中来计算每个整数在数组中出现的次数。每台计算机都应计算自己的哈希表,然后将它们全部放在驱动程序中的一个位置。
当您认为最终想要获得 Map 时,您需要将 RDD 中的记录转换为键值对,并使用 reduceByKey
.
您的特定示例听起来与著名的字数统计示例完全相同(请参阅此处的第一个示例),只是您想计算对象中数组中的整数,而不是计算句子中的单词(String)。在 Scala 中,这将转化为:
import org.apache.spark.rdd.RDD
import scala.collection.Map
class Example {
case class MyObj(ints: Array[Int], otherStuff: String)
def countInts(input: RDD[MyObj]): Map[Int, Int] = {
input
.flatMap(_.ints) // flatMap maps each record into several records - in this case, each int becomes a record
.map(i => (i, 1)) // turn into key-value map, with preliminary value 1 for each key
.reduceByKey(_ + _) // aggregate values by key
.collectAsMap() // collects data into a Map
}
}
通常,你应该让 Spark 以分布式方式执行尽可能多的操作,并尽可能延迟收集到内存中 - 如果你在化简之前收集值,通常会耗尽内存,除非你的数据集足够小,可以开始(在这种情况下,你并不真正需要 Spark)。
编辑:这是Java中的相同代码(更长,但相同...
static class MyObj implements Serializable {
Integer[] ints;
String otherStuff;
}
Map<Integer, Integer> countInts(JavaRDD<MyObj> input) {
return input
.flatMap(new FlatMapFunction<MyObj, Integer>() {
@Override
public Iterable<Integer> call(MyObj myObj) throws Exception {
return Arrays.asList(myObj.ints);
}
}) // flatMap maps each record into several records - in this case, each int becomes a record
.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<>(integer, 1);
}
}) // turn into key-value map, with preliminary value 1
.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}) // aggregate values by key
.collectAsMap(); // collects data into a Map
}