当通过JDBC从pyspark数据框插入到外部数据库表时,ON DUPLICATE KEY UPDATE



嗯,我正在使用PySpark,我有一个Spark数据框架,我使用它将数据插入mysql表。

url = "jdbc:mysql://hostname/myDB?user=xyz&password=pwd"

df.write.jdbc(url=url, table="myTable", mode="append")

我想通过它的列值和一个特定的数字来更新一个列值(它不在主键中)。

我已经尝试了不同的模式(追加,覆盖)DataFrameWriter.jdbc()函数。

我的问题是我们如何更新列值,因为我们在mysql中使用ON DUPLICATE KEY UPDATE,同时将pyspark数据框数据插入到表中。

这在香草pyspark(或Scala Spark)中是不可能的,因为您只有4种写入模式(来源https://spark.apache.org/docs/2.4.3/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc):

append:将这个DataFrame的内容追加到现有数据中。

overwrite:覆盖现有数据。

ignore:如果数据已经存在,则静默忽略此操作。

error or errorifexists(默认情况):如果数据已经存在则抛出异常。

不过,这里有一些变通的方法:

  1. 有一个jython包,允许您直接编写jdbc查询,因此您可以将代码结构为INSERT ... ON DUPLICATE KEY UPDATE ...。链接:https://pypi.org/project/JayDeBeApi/

  2. 如果你精通Scala,你可以写一个新的模式或者覆盖org.apache.spark.sql.execution.datasources.jdbcJdbcUtils.scala INSERT INTOINSERT ... ON DUPLICATE KEY UPDATE ...。或者更好,使用MERGE语句,如:

MERGE INTO table-name
USING table-ref
AS name
ON cond
WHEN NOT MATCHED THEN INSERT 
WHEN MATCHED THEN UPDATE

取决于你的SQL风格。

  • 使用暂存表覆盖,然后在此暂存环境上编写一个简单的mysql触发器,以使其运行INSERT INTO target_table ON DUPLICATE KEY UPDATE

  • 将您的Spark DataFrame移动到pandas DataFrame,并使用sqlalchemy和原始查询在那里编写upsert查询。

  • 使用Apache Kafka支持的Spark Streaming创建一个管道,然后使用具有jdbc upsert功能的工具,例如Kafka Connect to upsert直接到您的目标表。或者使用Kafka连接upserting从staging表到目标表。这里有一些阅读https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_connector.html#idempotent-writes

  • 一种解决方法是将数据插入到staging表中,然后使用驱动程序执行的SQL语句将其迁移到最终表中。然后可以使用与数据库提供程序相关的任何有效SQL语法。

    相关内容

    • 没有找到相关文章

    最新更新