如何使用Apache Spark Scala获得大型CSV / RDD[Array[double]]中所有列的直方图



我正在尝试使用Spark Scala计算CSV文件中所有列的直方图。

我发现DoubleRDDFunctions支持直方图。因此,我编写了如下代码来获取所有列的直方图。

  1. 获取列数
  2. 创建每个列的RDD[double],并使用DoubleRDDFunctions计算每个RDD的直方图

    var columnIndexArray = Array.tabulate(rdd.first().length) (_ * 1)
    val histogramData = columnIndexArray.map(columns => {
      rdd.map(lines => lines(columns)).histogram(6) 
    })
    

这是一个好方法吗?谁能提出一些更好的方法来解决这个问题?

不完全更好,但另一种方法是将RDD转换为DataFrame并使用histogram_numeric UDF。

示例数据:

import scala.util.Random
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.{callUDF, lit, col}
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
val sqlContext = new HiveContext(sc)
Random.setSeed(1)
val ncol = 5
val rdd = sc.parallelize((1 to 1000).map(
  _ => Row.fromSeq(Array.fill(ncol)(Random.nextDouble))
))
val schema = StructType(
  (1 to ncol).map(i => StructField(s"x$i", DoubleType, false)))
val df = sqlContext.createDataFrame(rdd, schema)
df.registerTempTable("df")
查询:

val nBuckets = 3
val columns = df.columns.map(
  c => callUDF("histogram_numeric", col(c), lit(nBuckets)).alias(c))
val histograms = df.select(columns: _*)
histograms.printSchema
// root
//  |-- x1: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- x: double (nullable = true)
//  |    |    |-- y: double (nullable = true)
//  |-- x2: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- x: double (nullable = true)
//  |    |    |-- y: double (nullable = true)
//  |-- x3: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- x: double (nullable = true)
//  |    |    |-- y: double (nullable = true)
//  |-- x4: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- x: double (nullable = true)
//  |    |    |-- y: double (nullable = true)
//  |-- x5: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- x: double (nullable = true)
//  |    |    |-- y: double (nullable = true)
histograms.select($"x1").collect()
// Array([WrappedArray([0.16874313309969038,334.0],
//   [0.513382068667877,345.0], [0.8421388886903808,321.0])])

(scala api)转换,countByValue应该做你想做的

因此,例如,为RDD中的第一列生成直方图数据:

val histCol1 = RDD.map(record => record.col_1).countByValue()
在上面的表达式

中,record只是引用RDD中的一个数据行,一个case类的实例,它有一个字段col_1

histCol1将返回一个哈希表(Scala Map),其中键是第1列(col_1)中的唯一值,值显然是每个唯一值

的频率

最新更新