问题:
我想使用 JDBC 连接使用 Spark 发出自定义请求。
此查询的目标是优化工作线程上的内存分配,因此我无法使用:
ss.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
现在:
我目前正在尝试运行:
ss = SparkSession
.builder()
.appName(appName)
.master("local")
.config(conf)
.getOrCreate()
ss.sql("some custom query")
配置:
url=jdbc:mysql://127.0.0.1/database_name
driver=com.mysql.jdbc.Driver
user=user_name
password=xxxxxxxxxx
错误:
[info] Exception encountered when attempting to run a suite with class name: db.TestUserProvider *** ABORTED ***
[info] org.apache.spark.sql.AnalysisException: Table or view not found: users; line 1 pos 14
[info] at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:459)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:478)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:463)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
[info] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
假设:
我想有一个配置错误,但我找不到在哪里。
Spark 可以使用 JDBC 数据源在关系数据库中读取和写入数据(就像您在第一个代码示例中所做的那样(。
此外(并且完全分开(,Spark 允许使用 SQL 查询基于已从某个源加载到 DataFrame 中的数据创建的视图。例如:
val df = Seq(1,2,3).toDF("a") // could be any DF, loaded from file/JDBC/memory...
df.createOrReplaceTempView("my_spark_table")
spark.sql("select a from my_spark_table").show()
只有以这种方式创建的"表"(从 Spark 2.0.0 开始称为视图(才能使用 SparkSession.sql
进行查询。
如果你的数据存储在关系数据库中,Spark必须首先从那里读取它,然后它才能在加载的副本上执行任何分布式计算。 底线 - 我们可以使用 read
从表中加载数据,创建一个临时视图,然后查询它:
ss.read
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1/database_name")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
.createOrReplaceTempView("my_spark_table")
// and then you can query the view:
val df = ss.sql("select * from my_spark_table where ... ")