我有两个rdd's
,即val tab_a: RDD[(String, String)]
和val tab_b: RDD[(String, String)]
我正在为这些数据集使用cogroup
,例如:
val tab_c = tab_a.cogroup(tab_b).collect.toArray
val updated = tab_c.map { x =>
{
//somecode
}
}
我正在为地图函数使用tab_c
共同分组的值,它适用于小型数据集,但在大型数据集的情况下,它会抛出Out Of Memory exception
.
我尝试将最终值转换为RDD,但没有运气相同的错误
val newcos = spark.sparkContext.parallelize(tab_c)
1.如何使用Cogroup处理大型数据集?
2.我们可以保留共分组值吗?
法典
val source_primary_key = source.map(rec => (rec.split(",")(0), rec))
source_primary_key.persist(StorageLevel.DISK_ONLY)
val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec))
destination_primary_key.persist(StorageLevel.DISK_ONLY)
val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect()
var srcmis: Array[String] = new Array[String](0)
var destmis: Array[String] = new Array[String](0)
var extrainsrc: Array[String] = new Array[String](0)
var extraindest: Array[String] = new Array[String](0)
var srcs: String = Seq("")(0)
var destt: String = Seq("")(0)
val updated = cos.map { x =>
{
val key = x._1
val value = x._2
srcs = value._1.mkString(",")
destt = value._2.mkString(",")
if (srcs.equalsIgnoreCase(destt) == false && destt != "") {
srcmis :+= srcs
destmis :+= destt
}
if (srcs == "") {
extraindest :+= destt.mkString("")
}
if (destt == "") {
extrainsrc :+= srcs.mkString("")
}
}
}
代码更新:
val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2)
// tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)}
{2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}..
错误:
ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)
ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
谢谢
当你使用collect()
时,你基本上是在告诉spark将所有结果数据移回主节点,这很容易产生瓶颈。此时你不再使用 Spark,只是一台机器中的一个普通数组。
要触发计算,只需使用需要每个节点上的数据的东西,这就是执行器位于分布式文件系统之上的原因。例如saveAsTextFile()
.
以下是一些基本示例。
请记住,这里的整个目标(也就是说,如果你有大数据)是将代码移动到你的数据并在那里计算,而不是将所有数据带到计算中。
TL;DR 不要collect
.
为了安全地运行此代码,在没有其他假设(工作器节点的平均要求可能明显更小)的情况下,每个节点(驱动程序和每个执行程序)都需要的内存大大超过所有数据的总内存要求。
如果你要在Spark之外运行它,你只需要一个节点。因此,Spark在这里没有任何好处。
但是,如果您跳过collect.toArray
并对数据分布做出一些假设,则可以很好地运行它。