将PostgreSQL数据库加载到SchemaRDD



我在PostgreSQL中有100万行和100+列的数据源,我想使用Spark SQL,所以我想转换这个数据源以获得SchemaRDD

Spark SQL 编程指南中介绍了两种方法, 一个是通过反思,这意味着我需要定义:

case class Row(Var1: Int, Var2: String, ...)

这很乏味,因为我有 100+ 列。

另一种方法是"以编程方式指定架构",这意味着我需要定义:

val schema =
  StructType(
    Seq(StructField("Var1", IntegerType), StructField("Var2", StringType), ...))

这对我来说也很乏味。

实际上,还有另一个问题,因为我使用 JdbcRDD 类加载我的 PostgreSQL 数据库,但我发现我还需要在构造函数JdbcRDD mapRow参数中定义架构,如下所示:

def extractValues(r: ResultSet) = {
  (r.getInt("Var1"), r.getString("Var2"), ...)
}
val dbRDD = new JdbcRDD(sc, createConnection,
  "SELECT * FROM PostgreSQL OFFSET ? LIMIT ?",
  0, 1000000, 1, extractValues)

这个 API 仍然要求我自己创建模式,更糟糕的是我需要重做类似的事情来将这个JdbcRDD转换为 SchemaRDD ,那将是非常笨拙的代码。

所以我想知道这项任务的最佳方法是什么?

您需要支持的数据类型数量有限。 为什么不使用

java.sql.ResultSetMetaData

例如

val rs = jdbcStatement.executeQuery("select * from myTable limit 1")
val rmeta = rs.getMetaData

以读取一行,然后为每个列动态生成所需的结构字段。

您需要一个案例陈述来处理

val myStructFields = for (cx <- 0 until rmeta.getColumnCount) {
       val jdbcType = rmeta.getColumnType(cx)
       } yield StructField(rmeta.getColumnName(cx),jdbcToSparkType(jdbcType))
val mySchema = StructType(myStructFields.toSeq)

其中jdbcToSparkType如下:

  def jdbcToSparkType(jdbcType: Int) = {
    jdbcType match {
       case 4 => InteegerType  
       case 6 => FloatType
        ..
   }  

更新 要生成 RDD[行]:您将遵循类似的模式。 在这种情况下,您将

val rows = for (rs.next) {
    row = jdbcToSpark(rs)
    } yield row
val rowRDD = sc.parallelize(rows)

哪里

def jdbcToSpark(rs: ResultSet) = {
   var rowSeq = Seq[Any]()
   for (cx <- 0 to rs.getMetaData.getColumnCount) {
     rs.getColumnType(cx) match {
         case 4 => rowSeq :+ rs.getInt(cx)
          ..
     }
   }
   Row.fromSeq(rowSeq)
}

然后 价值行

相关内容

  • 没有找到相关文章

最新更新