我在Java中有以下简单的代码。此代码创建并用 0 值填充Map
。
Map<Integer,Integer> myMap = new HashMap<Integer,Integer>();
for (int i=0; i<=20; i++) { myMap.put(i, 0); }
我想使用 Spark 和 Scala 创建一个类似的 RDD。我尝试了这种方法,但它返回我RDD[(Any) => (Any,Int)]
而不是RDD[Map(Int,Int)]
.我做错了什么?
val data = (0 to 20).map(_ => (_,0))
val myMapRDD = sparkContext.parallelize(data)
您正在创建的内容tuples
.相反,您需要创建Map
并parallelize
val data = (0 to 20).map(x => Map(x -> 0)) //data: scala.collection.immutable.IndexedSeq[scala.collection.immutable.Map[Int,Int]] = Vector(Map(0 -> 0), Map(1 -> 0), Map(2 -> 0), Map(3 -> 0), Map(4 -> 0), Map(5 -> 0), Map(6 -> 0), Map(7 -> 0), Map(8 -> 0), Map(9 -> 0), Map(10 -> 0), Map(11 -> 0), Map(12 -> 0), Map(13 -> 0), Map(14 -> 0), Map(15 -> 0), Map(16 -> 0), Map(17 -> 0), Map(18 -> 0), Map(19 -> 0), Map(20 -> 0))
val myMapRDD = sparkContext.parallelize(data) //myMapRDD: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[Int,Int]] = ParallelCollectionRDD[0] at parallelize at test.sc:19
在 Scala 中,(0 to 20).map(_ => (_, 0))
不会编译,因为它placeholder
语法无效。 我相信您可能正在寻找如下所示的内容:
val data = (0 to 20).map( _->0 )
这将生成一个键值对列表,实际上只是一个占位符简写:
val data = (0 to 20).map( n => n->0 )
// data: scala.collection.immutable.IndexedSeq[(Int, Int)] = Vector(
// (0,0), (1,0), (2,0), (3,0), (4,0), (5,0), (6,0), (7,0), (8,0), (9,0), (10,0),
// (11,0), (12,0), (13,0), (14,0), (15,0), (16,0), (17,0), (18,0), (19,0), (20,0)
// )
RDD 是一个不可移动的集合(例如Seq
,Array
) 的数据。 要创建Map[Int,Int]
的 RDD,您需要在Map
内扩展data
,而又被放置在Seq
集合中:
val rdd = sc.parallelize(Seq(Map(data: _*)))
rdd.collect
// res1: Array[scala.collection.immutable.Map[Int,Int]] = Array(
// Map(0 -> 0, 5 -> 0, 10 -> 0, 14 -> 0, 20 -> 0, 1 -> 0, 6 -> 0, 9 -> 0, 13 -> 0, 2 -> 0, 17 -> 0,
// 12 -> 0, 7 -> 0, 3 -> 0, 18 -> 0, 16 -> 0, 11 -> 0, 8 -> 0, 19 -> 0, 4 -> 0, 15 -> 0)
// )
请注意,这个RDD只包含一个Map
,当然你可以在RDD中组装任意数量的Map
。
val rdd2 = sc.parallelize(Seq(
Map((0 to 4).map( _->0 ): _*),
Map((5 to 9).map( _->0 ): _*),
Map((10 to 14).map( _->0 ): _*),
Map((15 to 19).map( _->0 ): _*)
))
你不能parallelize
Map
,因为parallelize
需要Seq
。你可以实现的是创建一个RDD[(Int, Int)]
,但它不会强制执行键的唯一性。要按键执行操作,您可以利用PairRDDFunctions
,尽管存在此限制,但最终对您的用例很有用。
让我们至少尝试获得RDD[(Int, Int)]
.
在将map
应用于范围时,您使用了稍微"错误"的语法。
_
占位符可以有不同的含义,具体取决于上下文。代码片段中混淆的两个含义是:
- 不会使用的匿名函数参数的占位符(如在
(_ => 42)
中,一个忽略其输入并始终返回42
的函数) - 匿名函数中参数的位置占位符(如
(_, 42)
接受一个参数并返回一个元组的函数,其中第一个元素是输入,第二个元素是数字42
)
上面的示例是简化的,不考虑类型推断,因为它们只想指出代码片段中混淆的_
占位符的两个含义。
第一步是使用以下两个函数之一来创建将成为地图一部分的对,或者
a => (a, 0)
或
(_, 0)
并行化后,您可以得到RDD[(Int, Int)]
,如下所示:
val pairRdd = sc.parallelize((0 to 20).map((_, 0)))
我相信这里值得注意的是,本地集合上的映射将急切地执行并绑定到驱动程序的资源,而您可以通过先并行化集合,然后在RDD
上映射配对创建函数来获得相同的最终结果。
现在,如前所述,您没有分布式映射,而是键值对的集合,其中不强制执行键唯一性。但是你可以使用PairRDDFunctions
无缝地使用这些值,您可以通过导入org.apache.spark.rdd.RDD.rddToPairRDDFunctions
自动获得这些值(或者不必在spark-shell
中做任何事情,因为已经为你完成了导入),这将利用 Scala 的隐式转换来装饰你的RDD
。
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
pairRdd.mapValues(_ + 1).foreach(println)
将打印以下内容
(10,1)
(11,1)
(12,1)
(13,1)
(14,1)
(15,1)
(16,1)
(17,1)
(18,1)
(19,1)
(20,1)
(0,1)
(1,1)
(2,1)
(3,1)
(4,1)
(5,1)
(6,1)
(7,1)
(8,1)
(9,1)
您可以在官方文档中了解有关使用 RDD API 使用键值对的更多信息。