spark 将 Spark-SQL 转换为 RDD API



Spark SQL对我来说很清楚。但是,我刚刚开始使用Spark的RDD API。由于 spark 将函数应用于并行列,这应该允许我摆脱缓慢的洗牌

def handleBias(df: DataFrame, colName: String, target: String = this.target) = {
    val w1 = Window.partitionBy(colName)
    val w2 = Window.partitionBy(colName, target)
    df.withColumn("cnt_group", count("*").over(w2))
      .withColumn("pre2_" + colName, mean(target).over(w1))
      .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
      .drop("cnt_group")
  }
}

在伪代码中:df foreach column (handleBias(column)因此加载了一个最小的数据框

val input = Seq(
    (0, "A", "B", "C", "D"),
    (1, "A", "B", "C", "D"),
    (0, "d", "a", "jkl", "d"),
    (0, "d", "g", "C", "D"),
    (1, "A", "d", "t", "k"),
    (1, "d", "c", "C", "D"),
    (1, "c", "B", "C", "D")
  )
  val inputDf = input.toDF("TARGET", "col1", "col2", "col3TooMany", "col4")

但无法正确映射

val rdd1_inputDf = inputDf.rdd.flatMap { x => {(0 until x.size).map(idx => (idx, x(idx)))}}
      rdd1_inputDf.toDF.show

它失败了

java.lang.ClassNotFoundException: scala.Any
java.lang.ClassNotFoundException: scala.Any

对于此问题中概述的问题,分别 https://github.com/geoHeil/sparkContrastCoding https://github.com/geoHeil/sparkContrastCoding/blob/master/src/main/scala/ColumnParallel.scala 找到一个示例。

当您在DataFrame上调用.rdd时,您会得到一个非强类型的RDD[Row]。如果您希望能够映射元素,则需要在Row上进行模式匹配:

scala> val input = Seq(
     |     (0, "A", "B", "C", "D"),
     |     (1, "A", "B", "C", "D"),
     |     (0, "d", "a", "jkl", "d"),
     |     (0, "d", "g", "C", "D"),
     |     (1, "A", "d", "t", "k"),
     |     (1, "d", "c", "C", "D"),
     |     (1, "c", "B", "C", "D")
     |   )
input: Seq[(Int, String, String, String, String)] = List((0,A,B,C,D), (1,A,B,C,D), (0,d,a,jkl,d), (0,d,g,C,D), (1,A,d,t,k), (1,d,c,C,D), (1,c,B,C,D))
scala> val inputDf = input.toDF("TARGET", "col1", "col2", "col3TooMany", "col4") 
inputDf: org.apache.spark.sql.DataFrame = [TARGET: int, col1: string ... 3 more fields]
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> val rowRDD = inputDf.rdd
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at rdd at <console>:27
scala> val typedRDD = rowRDD.map{case Row(a: Int, b: String, c: String, d: String, e: String) => (a,b,c,d,e)}
typedRDD: org.apache.spark.rdd.RDD[(Int, String, String, String, String)] = MapPartitionsRDD[20] at map at <console>:29
scala> typedRDD.keyBy(_._1).groupByKey.foreach{println}
[Stage 7:>                                                          (0 + 0) / 4]
(0,CompactBuffer((A,B,C,D), (d,a,jkl,d), (d,g,C,D)))
(1,CompactBuffer((A,B,C,D), (A,d,t,k), (d,c,C,D), (c,B,C,D)))

否则,您可以使用键入的Dataset

scala> val ds = input.toDS
ds: org.apache.spark.sql.Dataset[(Int, String, String, String, String)] = [_1: int, _2: string ... 3 more fields]
scala> ds.rdd
res2: org.apache.spark.rdd.RDD[(Int, String, String, String, String)] = MapPartitionsRDD[8] at rdd at <console>:30
scala> ds.rdd.keyBy(_._1).groupByKey.foreach{println}
[Stage 0:>                                                          (0 + 0) / 4]
(0,CompactBuffer((0,A,B,C,D), (0,d,a,jkl,d), (0,d,g,C,D)))
(1,CompactBuffer((1,A,B,C,D), (1,A,d,t,k), (1,d,c,C,D), (1,c,B,C,D)))

相关内容

  • 没有找到相关文章

最新更新