将 CassandraRDD 转换为 RDD[Array[String]]



我有Cassandra Table,我选择了一些列来对它们执行关联规则。我为每一列创建了案例类以将它们保存在其中。我有类型的列数据

com.datastax.spark.connector.rdd.CassandraRDD[SuperStoreSalesRG]

其中SuperStoreSalesRG是单列的案例类我想将其转换为

RDD[数组[字符串]]

怎么办?!

非常感谢..

这是我到目前为止尝试过的

val test_spark_rdd = sc.cassandraTable("demo1", "orders4") 
case class SuperStoreSalesPC (ProductCategory: String) 
case class SuperStoreSalesCS (CustomerSegment: String) 
case class SuperStoreSalesRG (Region: String) 
val resultPC = test_spark_rdd.select("productcategory").as(SuperStoreSalesP‌​C) 
val resultCS = test_spark_rdd.select("customersegment").as(SuperStoreSalesC‌​S) 
val resultRG = test_spark_rdd.select("region").as(SuperStoreSalesRG)

我想在单独的RDD[Array[String]]中转换每个vals:resultPC,resultCS,resultRG,其中这些vals是列

"productcategory", "customersegment", "region"的三列resultPC, resultCS, resultRG分成三个数据集后,可以执行以下操作将每个datasets转换为RDD[Array[String]]

第一步是使用内置collect_list函数

import org.apache.spark.sql.functions._
val arrayedResultPC = resultPC.withColumn("productcategory", collect_list("productcategory"))

这将产生具有以下schemadatasets

root
 |-- productcategory: array (nullable = true)
 |    |-- element: string (containsNull = true)

您可以对其他两个数据集执行相同的操作

最后一步是将收集的datasets转换为RDD[Array[String]]

val arrayedRdd = arrayedResultPC.rdd.map(_.toSeq(0).asInstanceOf[mutable.WrappedArray[String]])

我希望答案对您有所帮助

相关内容

  • 没有找到相关文章

最新更新