如何在生成的RDD中返回多条记录,映射输入RDD中的一条记录



经过多次联接后,我得到了一个RDD,其中包含以下记录:

(Int, ((Int, Option[Iterable[Int]]), Option[Iterable[Int]]))

它是:

(id_of_client, ((id_of_order, products_in_order), all_products_client_ever_bought)

我需要将其转换为(Int,Int,Boolean):

(id_of_order, all_products_client_ever_bought._1, was_this_product_in_this_order)
(id_of_order, all_products_client_ever_bought._2, was_this_product_in_this_order)
(id_of_order, all_products_client_ever_bought._3, was_this_product_in_this_order)
...

结果RDD中的记录应该与输入RDD的所有记录的all_products_client_ever_bough中的项目一样多。所以我正在映射我的输入RDD,RDD.map(transform_df(_))

def transform_df(row: (Int, ((Int, Option[Iterable[Int]]), Option[Iterable[Int]]))) = {
//(order_id, user_product_id, if_order_contains_product)
val order_products = row._2._1._2.get.toList
val user_products = row._2._2.get
for (product_id <- user_products) {
(row._2._1._1, product_id, order_products.contains(product_id))
}
}

结果,我得到了与输入长度相同但具有空元组的RDD。如何转换RDD?

您是对的,您需要"分解"您的数据集,即将每个记录映射到1个以上的记录。对于RDD API,以及大多数函数式编程语言,您需要使用flapMap函数(爆炸用于数据帧)。

有关如何使用平面图的更多详细信息,请参阅scala中的映射图和平面图。基本上,对于类型A的每个记录,您映射一个类型Seq[B]的序列,然后得到一个类型RDD[B]的RDD,其中所有内容都被展开。

Spark中另一个非常方便的方法是flatMapValue,它处理pairRDD(键值RDD),只对值进行平坦化。

在您的示例中,您可以首先将RDD映射到只包含您需要的内容并且更便于操作的内容。

rdd.map{ case (id_of_client, ((id_of_order, products_in_order), all_products) 
=> id_of_order -> (products_in_order.get.toSet, all_products.get) }

请注意,BTW使用模式匹配而不是_1.2.2表示法是一种很好的做法,可以使代码更可读。我还将订单中的产品转换为一个集合,因为之后我们需要对其提出请求。

然后,您只需要使用flatMapValues即可获得所需内容。

.flatMapValues{ case (products_in_order, all_products) =>
all_products.map(p => p -> product_in_order.contains(p)) }
.map { case (a,(b,c)) => (a,b,c) }

最后一行只是将结果转换为您想要的结果。

最新更新