嗯,我正在使用PySpark,我有一个Spark数据框架,我使用它将数据插入mysql表。
url = "jdbc:mysql://hostname/myDB?user=xyz&password=pwd"
df.write.jdbc(url=url, table="myTable", mode="append")
我想通过它的列值和一个特定的数字来更新一个列值(它不在主键中)。
我已经尝试了不同的模式(追加,覆盖)DataFrameWriter.jdbc()函数。
我的问题是我们如何更新列值,因为我们在mysql中使用ON DUPLICATE KEY UPDATE
,同时将pyspark数据框数据插入到表中。
这在香草pyspark
(或Scala Spark)中是不可能的,因为您只有4种写入模式(来源https://spark.apache.org/docs/2.4.3/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc):
append:将这个DataFrame的内容追加到现有数据中。
overwrite:覆盖现有数据。
ignore:如果数据已经存在,则静默忽略此操作。
error or errorifexists(默认情况):如果数据已经存在则抛出异常。
不过,这里有一些变通的方法:
-
有一个
jython
包,允许您直接编写jdbc
查询,因此您可以将代码结构为INSERT ... ON DUPLICATE KEY UPDATE ...
。链接:https://pypi.org/project/JayDeBeApi/ -
如果你精通Scala,你可以写一个新的模式或者覆盖
org.apache.spark.sql.execution.datasources.jdbc
和JdbcUtils.scala INSERT INTO
到INSERT ... ON DUPLICATE KEY UPDATE ...
。或者更好,使用MERGE
语句,如:
MERGE INTO table-name
USING table-ref
AS name
ON cond
WHEN NOT MATCHED THEN INSERT
WHEN MATCHED THEN UPDATE
取决于你的SQL风格。
使用暂存表覆盖,然后在此暂存环境上编写一个简单的
mysql
触发器,以使其运行INSERT INTO target_table ON DUPLICATE KEY UPDATE
。将您的Spark DataFrame移动到
pandas
DataFrame,并使用sqlalchemy
和原始查询在那里编写upsert
查询。使用Apache Kafka支持的Spark Streaming创建一个管道,然后使用具有jdbc upsert功能的工具,例如Kafka Connect to
upsert
直接到您的目标表。或者使用Kafka连接upserting
从staging表到目标表。这里有一些阅读https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_connector.html#idempotent-writes
一种解决方法是将数据插入到staging表中,然后使用驱动程序执行的SQL语句将其迁移到最终表中。然后可以使用与数据库提供程序相关的任何有效SQL语法。