我有Cassandra Table,我选择了一些列来对它们执行关联规则。我为每一列创建了案例类以将它们保存在其中。我有类型的列数据
com.datastax.spark.connector.rdd.CassandraRDD[SuperStoreSalesRG]
其中SuperStoreSalesRG是单列的案例类我想将其转换为
RDD[数组[字符串]]
怎么办?!
非常感谢..
这是我到目前为止尝试过的
val test_spark_rdd = sc.cassandraTable("demo1", "orders4")
case class SuperStoreSalesPC (ProductCategory: String)
case class SuperStoreSalesCS (CustomerSegment: String)
case class SuperStoreSalesRG (Region: String)
val resultPC = test_spark_rdd.select("productcategory").as(SuperStoreSalesPC)
val resultCS = test_spark_rdd.select("customersegment").as(SuperStoreSalesCS)
val resultRG = test_spark_rdd.select("region").as(SuperStoreSalesRG)
我想在单独的RDD[Array[String]]中转换每个vals:resultPC,resultCS,resultRG,其中这些vals是列
将"productcategory", "customersegment", "region"
的三列resultPC, resultCS, resultRG
分成三个数据集后,可以执行以下操作将每个datasets
转换为RDD[Array[String]]
第一步是使用内置collect_list
函数
import org.apache.spark.sql.functions._
val arrayedResultPC = resultPC.withColumn("productcategory", collect_list("productcategory"))
这将产生具有以下schema
的datasets
root
|-- productcategory: array (nullable = true)
| |-- element: string (containsNull = true)
您可以对其他两个数据集执行相同的操作
最后一步是将收集的datasets
转换为RDD[Array[String]]
val arrayedRdd = arrayedResultPC.rdd.map(_.toSeq(0).asInstanceOf[mutable.WrappedArray[String]])
我希望答案对您有所帮助