如何使用Spark和Scala创建RDD[Map(Int,Int)]?



我在Java中有以下简单的代码。此代码创建并用 0 值填充Map

Map<Integer,Integer> myMap = new HashMap<Integer,Integer>();
for (int i=0; i<=20; i++) { myMap.put(i, 0); }

我想使用 Spark 和 Scala 创建一个类似的 RDD。我尝试了这种方法,但它返回我RDD[(Any) => (Any,Int)]而不是RDD[Map(Int,Int)].我做错了什么?

val data = (0 to 20).map(_ => (_,0))
val myMapRDD = sparkContext.parallelize(data)

您正在创建的内容tuples.相反,您需要创建Mapparallelize

如下所示
val data = (0 to 20).map(x => Map(x -> 0))         //data: scala.collection.immutable.IndexedSeq[scala.collection.immutable.Map[Int,Int]] = Vector(Map(0 -> 0), Map(1 -> 0), Map(2 -> 0), Map(3 -> 0), Map(4 -> 0), Map(5 -> 0), Map(6 -> 0), Map(7 -> 0), Map(8 -> 0), Map(9 -> 0), Map(10 -> 0), Map(11 -> 0), Map(12 -> 0), Map(13 -> 0), Map(14 -> 0), Map(15 -> 0), Map(16 -> 0), Map(17 -> 0), Map(18 -> 0), Map(19 -> 0), Map(20 -> 0))
val myMapRDD = sparkContext.parallelize(data)       //myMapRDD: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[Int,Int]] = ParallelCollectionRDD[0] at parallelize at test.sc:19

在 Scala 中,(0 to 20).map(_ => (_, 0))不会编译,因为它placeholder语法无效。 我相信您可能正在寻找如下所示的内容:

val data = (0 to 20).map( _->0 )

这将生成一个键值对列表,实际上只是一个占位符简写:

val data = (0 to 20).map( n => n->0 )
// data: scala.collection.immutable.IndexedSeq[(Int, Int)] = Vector(
//   (0,0), (1,0), (2,0), (3,0), (4,0), (5,0), (6,0), (7,0), (8,0), (9,0), (10,0),
//   (11,0), (12,0), (13,0), (14,0), (15,0), (16,0), (17,0), (18,0), (19,0), (20,0)
// )

RDD 是一个不可移动的集合(例如SeqArray) 的数据。 要创建Map[Int,Int]的 RDD,您需要在Map内扩展data,而又被放置在Seq集合中:

val rdd = sc.parallelize(Seq(Map(data: _*)))
rdd.collect
// res1: Array[scala.collection.immutable.Map[Int,Int]] = Array(
//   Map(0 -> 0, 5 -> 0, 10 -> 0, 14 -> 0, 20 -> 0, 1 -> 0, 6 -> 0, 9 -> 0, 13 -> 0, 2 -> 0, 17 -> 0,
//       12 -> 0, 7 -> 0, 3 -> 0, 18 -> 0, 16 -> 0, 11 -> 0, 8 -> 0, 19 -> 0, 4 -> 0, 15 -> 0)
// )

请注意,这个RDD只包含一个Map,当然你可以在RDD中组装任意数量的Map

val rdd2 = sc.parallelize(Seq(
Map((0 to 4).map( _->0 ): _*),
Map((5 to 9).map( _->0 ): _*),
Map((10 to 14).map( _->0 ): _*),
Map((15 to 19).map( _->0 ): _*)
))

你不能parallelizeMap,因为parallelize需要Seq。你可以实现的是创建一个RDD[(Int, Int)],但它不会强制执行键的唯一性。要按键执行操作,您可以利用PairRDDFunctions,尽管存在此限制,但最终对您的用例很有用。

让我们至少尝试获得RDD[(Int, Int)].

在将map应用于范围时,您使用了稍微"错误"的语法。

_占位符可以有不同的含义,具体取决于上下文。代码片段中混淆的两个含义是:

一个
  • 不会使用的匿名函数参数的占位符(如在(_ => 42)中,一个忽略其输入并始终返回42的函数)
  • 匿名函数中参数的位置占位符(如(_, 42)接受一个参数并返回一个元组的函数,其中第一个元素是输入,第二个元素是数字42)

上面的示例是简化的,不考虑类型推断,因为它们只想指出代码片段中混淆的_占位符的两个含义。

第一步是使用以下两个函数之一来创建将成为地图一部分的对,或者

a => (a, 0)

(_, 0)

并行化后,您可以得到RDD[(Int, Int)],如下所示:

val pairRdd = sc.parallelize((0 to 20).map((_, 0)))

我相信这里值得注意的是,本地集合上的映射将急切地执行并绑定到驱动程序的资源,而您可以通过先并行化集合,然后在RDD上映射配对创建函数来获得相同的最终结果。

现在,如前所述,您没有分布式映射,而是键值对的集合,其中不强制执行键唯一性。但是你可以使用PairRDDFunctions无缝地使用这些值,您可以通过导入org.apache.spark.rdd.RDD.rddToPairRDDFunctions自动获得这些值(或者不必在spark-shell中做任何事情,因为已经为你完成了导入),这将利用 Scala 的隐式转换来装饰你的RDD

import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
pairRdd.mapValues(_ + 1).foreach(println)

将打印以下内容

(10,1)
(11,1)
(12,1)
(13,1)
(14,1)
(15,1)
(16,1)
(17,1)
(18,1)
(19,1)
(20,1)
(0,1)
(1,1)
(2,1)
(3,1)
(4,1)
(5,1)
(6,1)
(7,1)
(8,1)
(9,1)

您可以在官方文档中了解有关使用 RDD API 使用键值对的更多信息。

相关内容

最新更新