我在运行 GraphX 时遇到问题
val adjGraph= adjGraph_CC.vertices
.flatMap { case (id, (compID, adjSet)) => (mapMsgGen(id, compID, adjSet)) }
// mapMsgGen will generate a list of msgs each msg has the form K->V
.reduceByKey((fst, snd) =>mapMsgMerg(fst, snd)).collect
// mapMsgMerg will merge each two msgs passed to it
我期望 reduceByKey 做的是按键 (K) 对 flatMap 的整个输出进行分组,并使用提供的函数处理每个键 (K) 的值列表 (Vs)。
正在发生的事情是flatMap的每个输出(使用函数mapMsgGen),这是一个K->V对(通常不是同一个K)的列表,使用reduceByKey函数mapMsgMerg立即处理,并在整个flatMap完成之前。
需要一些澄清我不知道出了什么问题,还是我理解了flatMap和reduceByKey错误?
问候
马希尔
在开始reduceByKey
之前,无需生成flatMap
的全部输出。事实上,如果您不使用 flatMap
的中间输出,最好不要生成它并可能节省一些内存。
如果您的flatMap
输出包含'k' -> v1
和'k' -> v2
的列表,则没有理由等到生成整个列表后再将v1
和v2
传递给mapMsgMerge
。一旦这两个元组被输出v1
并且v2
可以组合为mapMsgMerge(v1, v2)
和v1
,如果不使用中间列表v2
则丢弃。
我不太了解 Spark 调度程序的细节,无法说这是否是保证的行为,但它确实看起来像是原始论文所谓的操作"流水线"的一个实例。