Spark read JDBC from SAS IOM



我正在尝试使用Spark JDBC从SAS IOM读取。问题是 SAS JDBC 驱动程序有点奇怪,所以我需要创建自己的方言:

object SasDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:sasiom")
override def quoteIdentifier(colName: String): String = """ + colName + ""n"
}

然而,这还不够。SAS 区分了列标签(= 人类可读的名称(和列名称(= 您在 SQL 查询中使用的名称(,但似乎 Spark 在架构发现中使用列标签而不是名称,请参阅下面的 JdbcUtils 摘录:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L293

while (i < ncols) {
val columnName = rsmd.getColumnLabel(i + 1)

这会导致 SQL 错误,因为它尝试在生成的 SQL 代码中使用人类可读的列名。

要使 SAS IOM JDBC 正常工作,这需要是 getColumnName 而不是 getColumnLabel。有没有办法在方言中指定这一点?我真的找不到一种方法来钩入这个,除了包装整个com.sas.rio.MVADriver和resultsetmeta

弗兰克

与此同时,我找到了如何做到这一点,所以只是发布以供参考。诀窍是注册您自己的方言,如下所示。

此外,SAS 用空格填充所有 varchar 列,所以我修剪了所有字符串列。

def getSasTable(sparkSession: org.apache.spark.sql.SparkSession, tablename: String): org.apache.spark.sql.DataFrame = {                                       
val host : String = "dwhid94.msnet.railb.be";                                                                                                               
val port : String = "48593";                                                                                                                                
val props = new java.util.Properties();                                                                                                                     
props.put("user", CredentialsStore.getUsername("sas"))                                                                                                      
props.put("password", CredentialsStore.getPassword("sas"))                                                                                                  
props.setProperty("driver", "com.sas.rio.MVADriver")                                                                                                        
val sasconurl : String =  String.format("jdbc:sasiom://%s:%s", host, port);                                                                                 
                                                                                          
object SasDialect extends JdbcDialect {                                                                                                                     
override def canHandle(url: String): Boolean = url.startsWith("jdbc:sasiom")                                                                              
override def quoteIdentifier(colName: String): String = """ + colName + ""n"                                                                            
}                                                                                                                                                           
JdbcDialects.registerDialect(SasDialect)                                                                                                                    
val df = sparkSession.read                                                                                                                                  
.option("url", sasconurl)                                                                                                                                 
.option("driver", "com.sas.rio.MVADriver")                                                                                                                
.option("dbtable", tablename)                                                                                                                             
.option("user",CredentialsStore.getUsername("sas"))                                                                                                       
.option("password",CredentialsStore.getPassword("sas"))                                                                                                   
.option("fetchsize",100)                                                                                                                                  
.format("jdbc")                                                                                                                                           
.load()                                                                                                                                                   
                                                                                          
val strippedDf = sparkSession.createDataFrame(df.rdd.map(r => Row(r.toSeq.map(x => x match {case s: String => s.trim; case _ => x}): _*)), df.schema);      
return strippedDf;                                                                                                                                          
}                                                                                                                                                             
                                                                                          

最新更新