我尝试使用SparkSQL(v.1.3.0)来访问PostgreSQL数据库。在这个数据库中,我有一个表
CREATE TABLE test (
id bigint,
values double precision[]
);
为了访问表,我使用
val sparkConf = new SparkConf().setAppName("TestRead").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val jdbcDF = sqlContext.load("jdbc", Map(
"url" -> "jdbc:postgresql://...",
"dbtable" -> "schema.test",
"user" -> "...",
"password" -> "..."))
sqlContext.sql("SELECT * FROM schema.test")
但是,每次我尝试访问包含此数组的表时,我都会得到一个java.sql.SQLException: Unsupported type 2003
.
我在 Spark 测试代码中找到了一个示例,该示例在 Spark 中为二维点创建 UDT(请参阅 ExamplePointUDT.scala)。但是,我不明白我怎么可能使用此代码。
这至少可以在 pyspark 中通过在查询内部进行转换来实现。不要让不受支持的类型达到 Spark,将它们投射到您的数据库中,然后在获取表后重新投射它们。
我不确定语法是否正确,但它会像这样:
val query_table = "(SELECT id, CAST(values AS TEXT) FROM schema.test) AS casted_table"
val jdbcDF = sqlContext.load("jdbc", Map(
"url" -> "jdbc:postgresql://...",
"dbtable" -> query_table,
"user" -> "...",
"password" -> "..."))
jdbcDF.map(x => (x.id, x.values.toArray))
我很确定没有.toArray
可以将字符串表示形式转换回数组,它只是占位符代码。但现在只是正确解析它的问题。
当然,这只是一个补丁,但它有效。