假设我有一个scala类的五个对象,我需要用这五个对象构建一个spark RDD,并将该RDD推送到cassandra表中我的cassandra表"person"有三个字段(pId,pName,pAge)和
val object 1= new myclass(1,"abc",24)
val object2 = new myclass(2,"pqr",23)
val object3 = new myclass(3,"xyz",26)
如何形成这三个对象的组合?下面一行可能吗
val collection=context.parallelize(Seq(object1,object2,object3))
,如果RDD可以制作…我如何将RDD推送到cassandra表以在该表"person"中插入三行
最简单的方法就是创建一个CaseClass,其中类与表中的行相匹配
case class PersonRow(pID: int, pName: String, pAge: Int)
context.parallelize(Seq(
PersonRow(1, "abc", 24),
PersonRow(2, "pqr", 23),
PersonRow(3, "xyz", 26)
)).saveToCassandra("ks","person")
有关更多信息,请参阅Spark Cassandra连接器文档
编辑
mapToRow
在Scala代码中是不必要的,因为它基本上是Scala中缺乏隐式的解决方案。SaveToCassandra通常使用隐式的RowWriterFactory
, Scala可以通过查看RDD类类型为您实现这一点。在Java中,工厂必须手工创建。
scala> class SomeRandomClass (val k:Int, val v:Int) {
| def fun() = {println("lots of fun")}
| val somethingElse:Int = 5
| }
defined class SomeRandomClass
scala> sc.parallelize(1 to 10).map( x => new SomeRandomClass(x,x)).saveToCassandra("test","test")
scala> sc.cassandraTable("test","test")
res4: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[7] at RDD at CassandraRDD.scala:15
scala> sc.cassandraTable("test","test").collect
res5: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{k: 5, v: 5}, CassandraRow{k: 10, v: 10}, CassandraRow{k: 1, v: 1}, CassandraRow{k: 8, v: 8}, CassandraRow{k: 2, v: 2}, CassandraRow{k: 4, v: 4}, CassandraRow{k: 7, v: 7}, CassandraRow{k: 6, v: 6}, CassandraRow{k: 9, v: 9}, CassandraRow{k: 3, v: 3})
注意,这只适用于类的字段(k和v)到表中列"k和v"之间的映射。