在cassandra db中插入一个spark RDD,其中包含n个scala类对象



假设我有一个scala类的五个对象,我需要用这五个对象构建一个spark RDD,并将该RDD推送到cassandra表中我的cassandra表"person"有三个字段(pId,pName,pAge)和

val object 1= new myclass(1,"abc",24)
val object2 = new myclass(2,"pqr",23)
val object3 = new myclass(3,"xyz",26)

如何形成这三个对象的组合?下面一行可能吗

val collection=context.parallelize(Seq(object1,object2,object3))

,如果RDD可以制作…我如何将RDD推送到cassandra表以在该表"person"中插入三行

最简单的方法就是创建一个CaseClass,其中类与表中的行相匹配

case class PersonRow(pID: int, pName: String, pAge: Int)
context.parallelize(Seq(
  PersonRow(1, "abc", 24),
  PersonRow(2, "pqr", 23),
  PersonRow(3, "xyz", 26)
)).saveToCassandra("ks","person")

有关更多信息,请参阅Spark Cassandra连接器文档

编辑

mapToRow在Scala代码中是不必要的,因为它基本上是Scala中缺乏隐式的解决方案。SaveToCassandra通常使用隐式的RowWriterFactory, Scala可以通过查看RDD类类型为您实现这一点。在Java中,工厂必须手工创建。

scala> class SomeRandomClass (val k:Int, val v:Int) {
     | def fun() = {println("lots of fun")}
     | val somethingElse:Int = 5
     | }
defined class SomeRandomClass
scala> sc.parallelize(1 to 10).map( x => new SomeRandomClass(x,x)).saveToCassandra("test","test")
scala> sc.cassandraTable("test","test")
res4: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[7] at RDD at CassandraRDD.scala:15
scala> sc.cassandraTable("test","test").collect
res5: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{k: 5, v: 5}, CassandraRow{k: 10, v: 10}, CassandraRow{k: 1, v: 1}, CassandraRow{k: 8, v: 8}, CassandraRow{k: 2, v: 2}, CassandraRow{k: 4, v: 4}, CassandraRow{k: 7, v: 7}, CassandraRow{k: 6, v: 6}, CassandraRow{k: 9, v: 9}, CassandraRow{k: 3, v: 3})

注意,这只适用于类的字段(k和v)到表中列"k和v"之间的映射。

相关内容

  • 没有找到相关文章

最新更新