从org.apache.spark.rdd.rdd [(((任何,任何),iToble [org.apache.spar



我有org.apache.spark.rdd.RDD[((Any, Any), Iterable[org.apache.spark.sql.Row])]数据,
如何打印数据或获取数据?

我的代码如下:

val sessionsDF = Seq(("day1","user1","session1", 100.0),
  ("day1","user1","session2",200.0),
  ("day2","user1","session3",300.0),
  ("day2","user1","session4",400.0),
  ("day2","user1","session4",99.0)
).toDF("day","userId","sessionId","purchaseTotal").toDF()
val groupByData=sessionsDF.groupBy(x=>(x.get(0),x.get(1))
val filterData=groupByData.filter(x=>x._1._1=="day1").map(x=>x._2)

上述代码返回org.apache.spark.rdd.RDD[((Any, Any), Iterable[org.apache.spark.sql.Row])]

在您的第一步中,您有额外的 .toDF()。正确的一个如下

val sessionsDF = Seq(("day1","user1","session1", 100.0),
  ("day1","user1","session2",200.0),
  ("day2","user1","session3",300.0),
  ("day2","user1","session4",400.0),
  ("day2","user1","session4",99.0)
).toDF("day","userId","sessionId","purchaseTotal")

在第二步中,您错过了.rdd,因此实际的第二步是

val groupByData=sessionsDF.rdd.groupBy(x=>(x.get(0),x.get(1)))

具有 datatype ,如您在问题中提到的

scala> groupByData
res12: org.apache.spark.rdd.RDD[((Any, Any), Iterable[org.apache.spark.sql.Row])] = ShuffledRDD[9] at groupBy at <console>:25

要查看groupByData rdd您可以简单地将foreach用作

groupByData.foreach(println)

会给你

((day1,user1),CompactBuffer([day1,user1,session1,100.0], [day1,user1,session2,200.0]))
((day2,user1),CompactBuffer([day2,user1,session3,300.0], [day2,user1,session4,400.0], [day2,user1,session4,99.0]))

现在,您的第三步是过滤数据,该数据具有day1作为dataframeday column的值。而且您仅采用分组 rdd数据的值。

val filterData=groupByData.filter(x=>x._1._1=="day1").map(x=>x._2)

此步骤的返回 datatype

scala> filterData
res13: org.apache.spark.rdd.RDD[Iterable[org.apache.spark.sql.Row]] = MapPartitionsRDD[11] at map at <console>:27

您可以使用上述foreach将数据视为

filterData.foreach(println)

会给你

CompactBuffer([day1,user1,session1,100.0], [day1,user1,session2,200.0])

您可以看到返回的 datatype RDD[Iterable[org.apache.spark.sql.Row]],因此您可以使用map AS

打印每个值
filterData.map(x => x.map(y => println(y(0), y(1), y(2), y(3)))).collect

会给你

(day1,user1,session1,100.0)
(day1,user1,session2,200.0)

如果您只做

filterData.map(x => x.map(y => println(y(0), y(3)))).collect

您会得到

(day1,100.0)
(day1,200.0)

我希望答案有帮助

相关内容

  • 没有找到相关文章

最新更新