我正在尝试使用Spark的JDBC访问存储在远程群集上的表(ORC格式(:
val jdbcDF = spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", "metrics")
.option("user", user)
.option("password", password)
.load()
但是,无论我做什么,我都会遇到这个错误:
引起的: java.lang.numberformatexception:for Input String:" Metrics.t" at org.apache.hive.jdbc.hivebaseresultset.getlong(hivebaseresultset.java:372(at org.apache.spark.sql.execution.dataSources.jdbc.jdbcutils $$ 在 org.apache.spark.sql.execution.datasources.jdbc.jdbc.jdbcutils $$ anonfun $ org org $ org $ apache $ sparch $ sql $ sql $ execution $ dataSources $ jdbc $ jdbcutils $ jdbcutils $ jdbcuttils $ jumphegetter $ 8.papply $ 8.apply(jdbcutils.scala:364(:364( 在 org.apache.spark.sql.execution.datasources.jdbc.jdbcutils $$ anon $ 1.getNext(jdbcutils.scala:286( 在 org.apache.spark.sql.execution.datasources.jdbc.jdbcutils $$ anon $ 1.getNext(jdbcutils.scala:268( atrg.apache.spark.util.nextiterator.hasnext(nextIterator.scala:73( 在 org.apache.spark.util.completioniterator.hasnext(posterioniterator.scala:32( 在 org.apache.spark.sql.catalyst.expressions.generatedClass $ generatedIterator.processnext(未知 来源( org.apache.spark.sql.execution.bufferedrowiterator.hasnext(BufferedRowiterator.java:43( 在 org.apache.spark.sql.execution.wholestagecodegenexec $$ anonfun $ 8 $$ anon $ 1.hasnext(holestagecodegenexec.scala:377( 在 org.apache.spark.sql.execution.sparkplan $$ anonfun $ 2.Apply(SparkPlan.Scala:231( 在 org.apache.spark.sql.execution.sparkplan $$ anonfun $ 2.Apply(SparkPlan.Scala:225( 在 org.apache.spark.rdd.rdd $$ anonfun $ mappartitionsinternal $ 1 $$ anonfun $ apply $ 25.apply(rdd.scala:826( 在 org.apache.spark.rdd.rdd $$ anonfun $ mappartitionsinternal $ 1 $$ anonfun $ apply $ 25.apply(rdd.scala:826( 在 org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrd.scala:38( 请访问org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323( atrg.apache.spark.rdd.rdd.iterator(rdd.scala:287(at org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87(at org.apache.spark.scheduler.task.run(task.scala:99(at org.apache.spark.executor.executor $ taskrunner.run(executor.scala:282( 在 java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142( 在 java.util.concurrent.threadpoolexecutor $ worker.run(threadpoolexecutor.java:617( 在java.lang.thread.run(thread.java:745( 由以下方式引起 java.lang.numberformatexception:for Input String:" Metrics.t" at java.lang.numberformatexception.forinputString(numberFormateXception.java:65( 在java.lang.long.long.parselong(long.java:589( java.lang.long.parselong(long.java:631( org.apache.hive.jdbc.hivebaseresultset.getlong(hivebaseresultset.java:368( ... 22多
输入字符串" Metrics.t"对应于表格和第二列的名称," T",其具有时间戳为长。
如何使用JDBC格式跳过标题?
CSV选项(" header",true(在我的情况下没有影响。
ps:Spark版本2.1.0
代码不会以以下实现提供任何例外:
val jdbcUrl = s"jdbc:hive2://$jdbcHostname:$jdbcPort/$jdbcDatabase"
val connectionProperties = new java.util.Properties()
connectionProperties.setProperty("user", jdbcUsername)
connectionProperties.setProperty("password", jdbcPassword)
val jdbcDF = spark.read.jdbc(jdbcUrl, "metrics", Array(), connectionProperties)
奇怪的是,如果我删除了空谓词Array()
,则例外又回来了。
因为spark jdbcdialect将双引号标记用作QuoteIdentifier,并且不提供HIVEDIALECT(例如MySQL不同(。
因此,Spark将通过JDBC:select "some_column_name" from table
将这种SQL发送到Hive,而"some_column_name"
原来是字符串标量,而不是列名。
val jdbcDF = spark.read.jdbc(jdbcUrl, "metrics", Array(), connectionProperties)
通过此代码行,您告诉Spark可以生成JDBC数据框架,而无需任何分区。因此,没有实际数据获取SQL被发送到Hive,Spark只会给您一个空的数据框。
唯一正确的方法是实现相应的方言:如何从JDBC创建Spark DataFrame时如何指定SQL方言?
我在初始化火花时启用了蜂巢支持,对我有用:
SparkSession spark = new SparkSession.Builder()
.master("local")
.appName("test")
.enableHiveSupport()
.getOrCreate();