我正在做一些datastax vm的练习。
给出了一个cassandrat的缩影,我将使用Spark API功能而不是Cassandra-Query-fimctions进行一些过滤并检索前5个元素。
我正在做以下操作:
val cassRdd = sc.cassandraTable("killr_video", "videos_by_year_title")
val cassRdd2 = cassRdd.filter(r=>r.getString("title") >= "T")
println("1" : + cassRdd2)
println("2" : + cassRdd2.count)
println("3" : + cassRdd2.take(5))
println("4" : + cassRdd2.take(5).count)
导致:
- 1:mappartitionsrdd [185]在过滤器上:19
- 2:2250
- 3:[lcom.datastax.spark.connector.cassandrarow;@56fd2e09
- 4:编译错误(丢失特征中方法计数的参数traversableonce
我所期望的:
- 1:和2:按预期工作
- 3:仅返回一排?我希望RDD为5 Cassandra行
- 4:这不是3:之后的RDD计数看起来它是某种cassandrarow-count-hethod,我不是打算致电
dataStax给出的解决方案使用RDD并在其上进行MAP转换,仅在标题上进行标题,并且在该新标题rdd上进行过滤和随身携带。
好吧,有效,但是我不明白,为什么take不在卡桑德拉罗(RDD(上工作,或者可能是什么结果。
val cassRdd2 = cassRdd.map(r=>r.getString("title")).filter(t >= "T")
我认为任何RDD上的收费命令(无论其内容如何(始终相同,采用第一个X元素,导致具有X元素大小的完全相同类型的新RDD。
<</p>rdd.take(n)
实际上将n
元素移至驱动程序并将其返回为数组,请参见Scaladoc。如果要打印它们:
println("3" : + cassRdd2.take(5).toList)
或cassRdd2.take(5).foreach(println)
。最后一行不起作用,因为该方法称为 length
(或 size
(数组:
println("4" : + cassRdd2.take(5).length)
我混合了一些东西:
take
是一个动作,我不应该期望rdd(但是是什么?有什么二进制?它有名字吗?某种集合?如果适合字符串或int,也可以单一的值(
我不应该在RDD上使用count
,而是应该在Java-Collections上使用size
。顺便说一句,count
也是一个动作,在像dump这样的动作之后使用动作,但它是如此直观。