我正在尝试使用Spark JDBC从SAS IOM读取。问题是 SAS JDBC 驱动程序有点奇怪,所以我需要创建自己的方言:
object SasDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:sasiom")
override def quoteIdentifier(colName: String): String = """ + colName + ""n"
}
然而,这还不够。SAS 区分了列标签(= 人类可读的名称(和列名称(= 您在 SQL 查询中使用的名称(,但似乎 Spark 在架构发现中使用列标签而不是名称,请参阅下面的 JdbcUtils 摘录:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L293
while (i < ncols) {
val columnName = rsmd.getColumnLabel(i + 1)
这会导致 SQL 错误,因为它尝试在生成的 SQL 代码中使用人类可读的列名。
要使 SAS IOM JDBC 正常工作,这需要是 getColumnName 而不是 getColumnLabel。有没有办法在方言中指定这一点?我真的找不到一种方法来钩入这个,除了包装整个com.sas.rio.MVADriver和resultsetmeta
弗兰克
与此同时,我找到了如何做到这一点,所以只是发布以供参考。诀窍是注册您自己的方言,如下所示。
此外,SAS 用空格填充所有 varchar 列,所以我修剪了所有字符串列。
def getSasTable(sparkSession: org.apache.spark.sql.SparkSession, tablename: String): org.apache.spark.sql.DataFrame = {
val host : String = "dwhid94.msnet.railb.be";
val port : String = "48593";
val props = new java.util.Properties();
props.put("user", CredentialsStore.getUsername("sas"))
props.put("password", CredentialsStore.getPassword("sas"))
props.setProperty("driver", "com.sas.rio.MVADriver")
val sasconurl : String = String.format("jdbc:sasiom://%s:%s", host, port);
object SasDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:sasiom")
override def quoteIdentifier(colName: String): String = """ + colName + ""n"
}
JdbcDialects.registerDialect(SasDialect)
val df = sparkSession.read
.option("url", sasconurl)
.option("driver", "com.sas.rio.MVADriver")
.option("dbtable", tablename)
.option("user",CredentialsStore.getUsername("sas"))
.option("password",CredentialsStore.getPassword("sas"))
.option("fetchsize",100)
.format("jdbc")
.load()
val strippedDf = sparkSession.createDataFrame(df.rdd.map(r => Row(r.toSeq.map(x => x match {case s: String => s.trim; case _ => x}): _*)), df.schema);
return strippedDf;
}