我在尝试将JDBC DataFrame加载到Spark SQL中时遇到了一个非常奇怪的问题。
我在笔记本电脑上尝试了几个Spark集群——YARN、独立集群和伪分布式模式。它在Spark 1.3.0和1.3.1上都是可复制的。问题出现在spark-shell
和使用spark-submit
执行代码时。我试过MySQL;MS SQL JDBC驱动程序没有成功。
考虑以下示例:
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://localhost:3306/test"
val t1 = {
sqlContext.load("jdbc", Map(
"url" -> url,
"driver" -> driver,
"dbtable" -> "t1",
"partitionColumn" -> "id",
"lowerBound" -> "0",
"upperBound" -> "100",
"numPartitions" -> "50"
))
}
到目前为止,模式得到了正确的解决:
t1: org.apache.spark.sql.DataFrame = [id: int, name: string]
但当我评估DataFrame:时
t1.take(1)
出现以下异常:
15/04/29 01:56:44 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.42): java.sql.SQLException: No suitable driver found for jdbc:mysql://<hostname>:3306/test
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:270)
at org.apache.spark.sql.jdbc.JDBCRDD$$anonfun$getConnector$1.apply(JDBCRDD.scala:158)
at org.apache.spark.sql.jdbc.JDBCRDD$$anonfun$getConnector$1.apply(JDBCRDD.scala:150)
at org.apache.spark.sql.jdbc.JDBCRDD$$anon$1.<init>(JDBCRDD.scala:317)
at org.apache.spark.sql.jdbc.JDBCRDD.compute(JDBCRDD.scala:309)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
当我尝试在executor上打开JDBC连接时:
import java.sql.DriverManager
sc.parallelize(0 until 2, 2).map { i =>
Class.forName(driver)
val conn = DriverManager.getConnection(url)
conn.close()
i
}.collect()
它工作得很好:
res1: Array[Int] = Array(0, 1)
当我在本地Spark上运行相同的代码时,它也能完美地工作:
scala> t1.take(1)
...
res0: Array[org.apache.spark.sql.Row] = Array([1,one])
我使用的是Spark预构建的Hadoop2.4支持。
重现问题的最简单方法是使用start-all.sh
脚本以伪分布式模式启动Spark,并运行以下命令:
/path/to/spark-shell --master spark://<hostname>:7077 --jars /path/to/mysql-connector-java-5.1.35.jar --driver-class-path /path/to/mysql-connector-java-5.1.35.jar
有办法解决这个问题吗?这看起来是一个严重的问题,所以奇怪的是谷歌搜索在这里没有帮助。
显然,这个问题最近已经被报道:
https://issues.apache.org/jira/browse/SPARK-6913
问题出在java.sql.DriverManager中,它看不到除引导ClassLoader之外的ClassLoader加载的驱动程序。
作为一种临时解决方法,可以将所需的驱动程序添加到执行器的引导类路径中。
更新:此pull请求修复了问题:https://github.com/apache/spark/pull/5782
更新2:修复程序合并到Spark 1.4
用于将数据写入MySQL
在spark 1.4.0中,在写入MySQL之前必须先加载MySQL,因为它在加载函数上加载驱动程序,而在写入函数上不加载驱动程序。我们必须在每个工作节点上放置jar,并在每个节点的spark-defaults.conf文件中设置路径。此问题已在spark 1.5.0 中修复
https://issues.apache.org/jira/browse/SPARK-10036
我们被困在Spark 1.3(Cloudera 5.4)上,所以我发现这个问题和Wildfire的答案很有帮助,因为它让我不再把头撞到墙上。
我想和大家分享一下我们是如何将驱动程序放入引导类路径的:我们只是将其复制到所有节点上的/opt/cloudera/plocks/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/hive/lib中。
我在SQL server中使用spark-1.6.1,仍然面临同样的问题。我不得不将库(sqljdbc-4.0.jar)添加到实例中的lib中,并在conf/spark-dfault.conf
文件的行下添加。
spark.driver.extraClassPath lib/sqljdbc-4.0.jar