我有pyspark.rdd.PipelinedRDD
(Rdd1)
.当我做Rdd1.collect()
时,它给出的结果如下。
[(10, {3: 3.616726727464709, 4: 2.9996439803387602, 5: 1.6767412921625855}),
(1, {3: 2.016527311459324, 4: -1.5271512313750577, 5: 1.9665475696370045}),
(2, {3: 6.230272144805092, 4: 4.033642544526678, 5: 3.1517805604906313}),
(3, {3: -0.3924680103722977, 4: 2.9757316477407443, 5: -1.5689126834176417})]
现在我想使用 collect(( 方法将 pyspark.rdd.PipelinedRDD 转换为数据帧,而无需
我的最终数据框应如下所示。 df.show()
应该是这样的:
+----------+-------+-------------------+
|CId |IID |Score |
+----------+-------+-------------------+
|10 |4 |2.9996439803387602 |
|10 |5 |1.6767412921625855 |
|10 |3 |3.616726727464709 |
|1 |4 |-1.5271512313750577|
|1 |5 |1.9665475696370045 |
|1 |3 |2.016527311459324 |
|2 |4 |4.033642544526678 |
|2 |5 |3.1517805604906313 |
|2 |3 |6.230272144805092 |
|3 |4 |2.9757316477407443 |
|3 |5 |-1.5689126834176417|
|3 |3 |-0.3924680103722977|
+----------+-------+-------------------+
我可以在下一个应用收集、迭代和最后的数据框来实现到 rdd 的转换。
但现在我想不使用任何collect()
方法将pyspark.rdd.PipelinedRDD
转换为数据帧。
请让我知道如何实现这一点?
你想在这里做两件事:1. 扁平化数据2. 将其放入数据帧中
一种方法如下:
首先,让我们把字典展平:
rdd2 = Rdd1.flatMapValues(lambda x : [ (k, x[k]) for k in x.keys()])
收集数据时,你会得到这样的结果:
[(10, (3, 3.616726727464709)), (10, (4, 2.9996439803387602)), ...
然后我们可以格式化数据并将其转换为数据帧:
rdd2.map(lambda x : (x[0], x[1][0], x[1][1]))
.toDF(("CId", "IID", "Score"))
.show()
这给你这个:
+---+---+-------------------+
|CId|IID| Score|
+---+---+-------------------+
| 10| 3| 3.616726727464709|
| 10| 4| 2.9996439803387602|
| 10| 5| 1.6767412921625855|
| 1| 3| 2.016527311459324|
| 1| 4|-1.5271512313750577|
| 1| 5| 1.9665475696370045|
| 2| 3| 6.230272144805092|
| 2| 4| 4.033642544526678|
| 2| 5| 3.1517805604906313|
| 3| 3|-0.3924680103722977|
| 3| 4| 2.9757316477407443|
| 3| 5|-1.5689126834176417|
+---+---+-------------------+
有一个更简单、更优雅的解决方案,避免使用 python lambda 表达式,如@oli答案,它依赖于 Spark DataFrames 的explode
,完全符合您的要求。它也应该更快,因为没有必要使用 python lambda 两次。见下文:
from pyspark.sql.functions import explode
# dummy data
data = [(10, {3: 3.616726727464709, 4: 2.9996439803387602, 5: 1.6767412921625855}),
(1, {3: 2.016527311459324, 4: -1.5271512313750577, 5: 1.9665475696370045}),
(2, {3: 6.230272144805092, 4: 4.033642544526678, 5: 3.1517805604906313}),
(3, {3: -0.3924680103722977, 4: 2.9757316477407443, 5: -1.5689126834176417})]
# create your rdd
rdd = sc.parallelize(data)
# convert to spark data frame
df = rdd.toDF(["CId", "Values"])
# use explode
df.select("CId", explode("Values").alias("IID", "Score")).show()
+---+---+-------------------+
|CId|IID| Score|
+---+---+-------------------+
| 10| 3| 3.616726727464709|
| 10| 4| 2.9996439803387602|
| 10| 5| 1.6767412921625855|
| 1| 3| 2.016527311459324|
| 1| 4|-1.5271512313750577|
| 1| 5| 1.9665475696370045|
| 2| 3| 6.230272144805092|
| 2| 4| 4.033642544526678|
| 2| 5| 3.1517805604906313|
| 3| 3|-0.3924680103722977|
| 3| 4| 2.9757316477407443|
| 3| 5|-1.5689126834176417|
+---+---+-------------------+
<</div>
div class="one_answers"> 这就是使用 scala 的方法
val Rdd1 = spark.sparkContext.parallelize(Seq(
(10, Map(3 -> 3.616726727464709, 4 -> 2.9996439803387602, 5 -> 1.6767412921625855)),
(1, Map(3 -> 2.016527311459324, 4 -> -1.5271512313750577, 5 -> 1.9665475696370045)),
(2, Map(3 -> 6.230272144805092, 4 -> 4.033642544526678, 5 -> 3.1517805604906313)),
(3, Map(3 -> -0.3924680103722977, 4 -> 2.9757316477407443, 5 -> -1.5689126834176417))
))
val x = Rdd1.flatMap(x => (x._2.map(y => (x._1, y._1, y._2))))
.toDF("CId", "IId", "score")
输出:
+---+---+-------------------+
|CId|IId|score |
+---+---+-------------------+
|10 |3 |3.616726727464709 |
|10 |4 |2.9996439803387602 |
|10 |5 |1.6767412921625855 |
|1 |3 |2.016527311459324 |
|1 |4 |-1.5271512313750577|
|1 |5 |1.9665475696370045 |
|2 |3 |6.230272144805092 |
|2 |4 |4.033642544526678 |
|2 |5 |3.1517805604906313 |
|3 |3 |-0.3924680103722977|
|3 |4 |2.9757316477407443 |
|3 |5 |-1.5689126834176417|
+---+---+-------------------+
希望你能转换为 pyspark。
确保首先创建 Spark 会话:
sc = SparkContext()
spark = SparkSession(sc)
当我试图解决这个确切的问题时,我找到了这个答案。
"PipelinedRDD"对象在PySpark中没有属性"toDF">