嘿伙计们,
我需要使用 Apache Spark DataFrame 执行 jdbc 操作。基本上,我有一个名为 Measures 的历史 jdbc 表,我必须在其中执行两个操作:
1. 将旧度量记录的结束时间有效性属性设置为当前时间
2. 插入新的度量记录设置 endTime 到 9999-12-31
有人可以告诉我如何执行(如果可以的话)第一个操作的更新语句并为第二个操作插入吗?
我尝试将此语句用于第一个操作:
val dfWriter = df.write.mode(SaveMode.Overwrite)
dfWriter.jdbc("jdbc:postgresql:postgres", tableName, prop)
但它不起作用,因为存在重复的密钥冲突。如果我们可以做更新,我们如何做删除语句?
提前谢谢。
我认为Spark还没有开箱即用地支持它。你可以做什么 使用 foreachRDD() 循环遍历数据帧/RDD,并使用 JDBC API 手动更新/删除表。
这是一个类似问题的链接:Spark Dataframes UPSERT to Postgres Table