如何使用JDBC源代码在(Py)Spark中写入和读取数据



这个问题的目标是记录:

  • 在PySpark 中使用JDBC连接读取和写入数据所需的步骤

  • JDBC源和已知解决方案可能存在的问题

只要做一些小的更改,这些方法就可以与其他支持的语言一起使用,包括Scala和R.。

写入数据

  1. 在提交应用程序或启动shell时包含适用的JDBC驱动程序。例如,您可以使用--packages:

     bin/pyspark --packages group:name:version  
    

或将driver-class-pathjars 组合

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR

还可以在JVM实例启动之前使用PYSPARK_SUBMIT_ARGS环境变量设置这些属性,或者使用conf/spark-defaults.conf设置spark.jars.packagesspark.jars/spark.driver.extraClassPath

  1. 选择所需的模式。Spark JDBC编写器支持以下模式:

    • append:将this:class:DataFrame的内容附加到现有数据中
    • overwrite:覆盖已有数据
    • ignore:如果数据已经存在,则静默忽略此操作
    • error(默认情况):如果数据已经存在,则引发异常

    不支持Upserts或其他细粒度修改

     mode = ...
    
  2. 准备JDBC URI,例如:

     # You can encode credentials in URI or pass
     # separately using properties argument
     # of jdbc method or options
     url = "jdbc:postgresql://localhost/foobar"
    
  3. (可选)创建JDBC参数的字典。

     properties = {
         "user": "foo",
         "password": "bar"
     }
    

    properties/options还可以用于设置支持的JDBC连接属性。

  4. 使用DataFrame.write.jdbc

     df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

保存数据(详见pyspark.sql.DataFrameWriter)。

已知问题

  • 当使用--packagesjava.sql.SQLException: No suitable driver found for jdbc: ...)包含驱动程序时,找不到合适的驱动程序

    假设没有驱动程序版本不匹配来解决这个问题,您可以将driver类添加到properties中。例如:

      properties = {
          ...
          "driver": "org.postgresql.Driver"
      }
    
  • 使用df.write.format("jdbc").options(...).save()可能导致:

    java.lang.RuntimeException:org.apache.spark.sql.execution.datasources.jdbc.DefaultSource不允许将表创建为select。

    解决方案未知。

  • 在Pyspark 1.3中,您可以尝试直接调用Java方法:

      df._jdf.insertIntoJDBC(url, "baz", True)
    

正在读取数据

  1. 按照写入数据中的步骤1-4进行操作

  2. 使用sqlContext.read.jdbc:

     sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

sqlContext.read.format("jdbc"):

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())

已知问题和问题

  • 找不到合适的驱动程序-请参阅:写入数据

  • Spark SQL支持JDBC源的谓词下推,尽管并非所有谓词都可以下推。它也不委托限制或聚合。可能的解决方法是用有效的子查询替换dbtable/table参数。参见示例:

    • spark谓词下推与JDBC一起工作吗
    • 执行pyspark.sql.DataFrame.take超过一个小时(4)
    • 如何使用SQL查询在dbtable中定义表
  • 默认情况下,JDBC数据源使用单个执行器线程按顺序加载数据。为了确保分布式数据加载,您可以:

    • 提供分区column(必须是IntegerType)、lowerBoundupperBoundnumPartitions
    • 提供互斥谓词predicates的列表,每个谓词对应一个所需的分区

    参见:

    • 通过JDBC从RDBMS读取数据时在spark中进行分区
    • 从JDBC源迁移数据时如何优化分区
    • 如何使用DataFrame和JDBC连接来提高慢速Spark作业的性能
    • 使用JDBC导入Postgres时,如何对Spark RDD进行分区
  • 在分布式模式中(使用分区列或谓词),每个执行器都在自己的事务中操作。如果同时修改源数据库,则不能保证最终视图是一致的。

在哪里可以找到合适的司机:

  • Maven Repository(要获得--packages所需的坐标,请选择所需的版本,并以compile-group:name:version的形式从Gradle选项卡复制数据,替换相应的字段)或Maven Central Repository:

    • PostgreSQL
    • MySQL

其他选项

根据数据库的不同,可能存在专门的来源,在某些情况下更可取:

  • Greenplum-枢轴式Greenplum火花连接器
  • Apache Phoenix-Apache Spark插件
  • Microsoft SQL Server-用于Azure SQL数据库和SQL Server的Spark连接器
  • AmazonRedshift-Databricks Redshift连接器(当前版本仅在专有的DatabricksRuntime中提供。已停产的开源版本,可在GitHub上获得)

下载mysql连接器java驱动程序并保存在spark jar文件夹中,观察下面的python代码将数据写入"acotr1",我们必须在mysql数据库中创建acotr1表结构

    spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:spark-2.1.0-bin-hadoop2.7jarsmysql-connector-java-5.1.41-bin.jar').getOrCreate()
    sc = spark.sparkContext
    from pyspark.sql import SQLContext
    sqlContext = SQLContext(sc)
    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()
    mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"
    df.write.jdbc(mysql_url,table="actor1",mode="append")

请参阅此链接下载用于postgres的jdbc,并按照步骤下载jar文件

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.htmljar文件将按照如下路径下载。"/home/anand/.ivy2/jars.org/postgresql_postgresql-42.1.1.jar"

如果你的火花版本是2

from pyspark.sql import SparkSession
spark = SparkSession.builder
        .appName("sparkanalysis")
        .config("spark.driver.extraClassPath",
         "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
        .getOrCreate()
//for localhost database//
pgDF = spark.read 
.format("jdbc") 
.option("url", "jdbc:postgresql:postgres") 
.option("dbtable", "public.user_emp_tab") 
.option("user", "postgres") 
.option("password", "Jonsnow@100") 
.load()

print(pgDF)
pgDF.filter(pgDF["user_id"]>5).show()

将文件保存为python并运行"python respectivefilename.py"

相关内容

  • 没有找到相关文章

最新更新