我在PostgreSQL中有100万行和100+列的数据源,我想使用Spark SQL,所以我想转换这个数据源以获得SchemaRDD
。
Spark SQL 编程指南中介绍了两种方法, 一个是通过反思,这意味着我需要定义:
case class Row(Var1: Int, Var2: String, ...)
这很乏味,因为我有 100+ 列。
另一种方法是"以编程方式指定架构",这意味着我需要定义:
val schema =
StructType(
Seq(StructField("Var1", IntegerType), StructField("Var2", StringType), ...))
这对我来说也很乏味。
实际上,还有另一个问题,因为我使用 JdbcRDD
类加载我的 PostgreSQL
数据库,但我发现我还需要在构造函数JdbcRDD
mapRow
参数中定义架构,如下所示:
def extractValues(r: ResultSet) = {
(r.getInt("Var1"), r.getString("Var2"), ...)
}
val dbRDD = new JdbcRDD(sc, createConnection,
"SELECT * FROM PostgreSQL OFFSET ? LIMIT ?",
0, 1000000, 1, extractValues)
这个 API 仍然要求我自己创建模式,更糟糕的是我需要重做类似的事情来将这个JdbcRDD
转换为 SchemaRDD
,那将是非常笨拙的代码。
所以我想知道这项任务的最佳方法是什么?
您需要支持的数据类型数量有限。 为什么不使用
java.sql.ResultSetMetaData
例如
val rs = jdbcStatement.executeQuery("select * from myTable limit 1")
val rmeta = rs.getMetaData
以读取一行,然后为每个列动态生成所需的结构字段。
您需要一个案例陈述来处理
val myStructFields = for (cx <- 0 until rmeta.getColumnCount) {
val jdbcType = rmeta.getColumnType(cx)
} yield StructField(rmeta.getColumnName(cx),jdbcToSparkType(jdbcType))
val mySchema = StructType(myStructFields.toSeq)
其中jdbcToSparkType如下:
def jdbcToSparkType(jdbcType: Int) = {
jdbcType match {
case 4 => InteegerType
case 6 => FloatType
..
}
更新 要生成 RDD[行]:您将遵循类似的模式。 在这种情况下,您将
val rows = for (rs.next) {
row = jdbcToSpark(rs)
} yield row
val rowRDD = sc.parallelize(rows)
哪里
def jdbcToSpark(rs: ResultSet) = {
var rowSeq = Seq[Any]()
for (cx <- 0 to rs.getMetaData.getColumnCount) {
rs.getColumnType(cx) match {
case 4 => rowSeq :+ rs.getInt(cx)
..
}
}
Row.fromSeq(rowSeq)
}
然后 价值行