这个问题的目标是记录:
-
在PySpark 中使用JDBC连接读取和写入数据所需的步骤
-
JDBC源和已知解决方案可能存在的问题
只要做一些小的更改,这些方法就可以与其他支持的语言一起使用,包括Scala和R.。
写入数据
-
在提交应用程序或启动shell时包含适用的JDBC驱动程序。例如,您可以使用
--packages
:bin/pyspark --packages group:name:version
或将driver-class-path
和jars
组合
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
还可以在JVM实例启动之前使用PYSPARK_SUBMIT_ARGS
环境变量设置这些属性,或者使用conf/spark-defaults.conf
设置spark.jars.packages
或spark.jars
/spark.driver.extraClassPath
。
选择所需的模式。Spark JDBC编写器支持以下模式:
append
:将this:class:DataFrame
的内容附加到现有数据中overwrite
:覆盖已有数据ignore
:如果数据已经存在,则静默忽略此操作error
(默认情况):如果数据已经存在,则引发异常
不支持Upserts或其他细粒度修改
mode = ...
准备JDBC URI,例如:
# You can encode credentials in URI or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar"
(可选)创建JDBC参数的字典。
properties = { "user": "foo", "password": "bar" }
properties
/options
还可以用于设置支持的JDBC连接属性。使用
DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
保存数据(详见pyspark.sql.DataFrameWriter
)。
已知问题:
当使用
--packages
(java.sql.SQLException: No suitable driver found for jdbc: ...
)包含驱动程序时,找不到合适的驱动程序假设没有驱动程序版本不匹配来解决这个问题,您可以将
driver
类添加到properties
中。例如:properties = { ... "driver": "org.postgresql.Driver" }
使用
df.write.format("jdbc").options(...).save()
可能导致:java.lang.RuntimeException:org.apache.spark.sql.execution.datasources.jdbc.DefaultSource不允许将表创建为select。
解决方案未知。
在Pyspark 1.3中,您可以尝试直接调用Java方法:
df._jdf.insertIntoJDBC(url, "baz", True)
正在读取数据
按照写入数据中的步骤1-4进行操作
使用
sqlContext.read.jdbc
:sqlContext.read.jdbc(url=url, table="baz", properties=properties)
或sqlContext.read.format("jdbc")
:
(sqlContext.read.format("jdbc")
.options(url=url, dbtable="baz", **properties)
.load())
已知问题和问题:
找不到合适的驱动程序-请参阅:写入数据
Spark SQL支持JDBC源的谓词下推,尽管并非所有谓词都可以下推。它也不委托限制或聚合。可能的解决方法是用有效的子查询替换
dbtable
/table
参数。参见示例:- spark谓词下推与JDBC一起工作吗
- 执行pyspark.sql.DataFrame.take超过一个小时(4)
- 如何使用SQL查询在dbtable中定义表
默认情况下,JDBC数据源使用单个执行器线程按顺序加载数据。为了确保分布式数据加载,您可以:
- 提供分区
column
(必须是IntegerType
)、lowerBound
、upperBound
和numPartitions
- 提供互斥谓词
predicates
的列表,每个谓词对应一个所需的分区
参见:
- 通过JDBC从RDBMS读取数据时在spark中进行分区
- 从JDBC源迁移数据时如何优化分区
- 如何使用DataFrame和JDBC连接来提高慢速Spark作业的性能
- 使用JDBC导入Postgres时,如何对Spark RDD进行分区
- 提供分区
在分布式模式中(使用分区列或谓词),每个执行器都在自己的事务中操作。如果同时修改源数据库,则不能保证最终视图是一致的。
在哪里可以找到合适的司机:
Maven Repository(要获得
--packages
所需的坐标,请选择所需的版本,并以compile-group:name:version
的形式从Gradle选项卡复制数据,替换相应的字段)或Maven Central Repository:- PostgreSQL
- MySQL
其他选项
根据数据库的不同,可能存在专门的来源,在某些情况下更可取:
- Greenplum-枢轴式Greenplum火花连接器
- Apache Phoenix-Apache Spark插件
- Microsoft SQL Server-用于Azure SQL数据库和SQL Server的Spark连接器
- AmazonRedshift-Databricks Redshift连接器(当前版本仅在专有的DatabricksRuntime中提供。已停产的开源版本,可在GitHub上获得)
下载mysql连接器java驱动程序并保存在spark jar文件夹中,观察下面的python代码将数据写入"acotr1",我们必须在mysql数据库中创建acotr1表结构
spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:spark-2.1.0-bin-hadoop2.7jarsmysql-connector-java-5.1.41-bin.jar').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()
mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"
df.write.jdbc(mysql_url,table="actor1",mode="append")
请参阅此链接下载用于postgres的jdbc,并按照步骤下载jar文件
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.htmljar文件将按照如下路径下载。"/home/anand/.ivy2/jars.org/postgresql_postgresql-42.1.1.jar"
如果你的火花版本是2
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("sparkanalysis")
.config("spark.driver.extraClassPath",
"/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
.getOrCreate()
//for localhost database//
pgDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:postgres")
.option("dbtable", "public.user_emp_tab")
.option("user", "postgres")
.option("password", "Jonsnow@100")
.load()
print(pgDF)
pgDF.filter(pgDF["user_id"]>5).show()
将文件保存为python并运行"python respectivefilename.py"