我试图将JDBC ResultSet转换为Spark RDD,并正在寻找一种使用Spark并行特性的有效方法。
以下是我根据此 https://stackoverflow.com/a/32073423/6064131 实施的内容
val rs:ResultSet = stmt .getResultSet
val colCount = rs.getMetaData.getColumnCount
def getRowFromResultSet(resultSet: ResultSet): String ={
var i:Int = 1
var rowStr=""
while(i<=colCount){
rowStr=rowStr+resultSet.getString(i)+delim
i+=1
}
rowStr
}
val resultSetList = Iterator.continually((rs.next(), rs)).takeWhile(_._1).map(r => {
getRowFromResultSet(r._2) // (ResultSet) => (spark.sql.Row)
}).toList
val x = sc.parallelize(resultSetList)
现在的主要问题是它需要更多的时间,我知道所有数据集都是通过一个针眼拉动的。但是有没有更好的方法来实现这一目标?
有些人可能想知道为什么我没有使用内置功能 sqlContext.read.format 来实现这一点,原因是 Spark 在查询周围包装了一个"SELECT * FROM ( (",这会产生复杂查询的问题。 有关详细信息,请参阅链接 Cloudera JDBC 驱动程序的 WITH 子句问题 - 返回列名而不是实际数据
但是有没有更好的方法来实现这一目标?
我不会重新发明轮子。如果您在最近的 Spark 版本(1.6 相当旧(和 JDBC 驱动程序(我猜这是罪魁祸首(中仍然遇到同样的问题,只需CREATE VIEW
并将其用于查询。
另外不要忘记提交错误报告。