我有两个表
_Insert
[主表]_Edit
[包含Insert表的更新。它可以有多个条目更新一个主键]
Primary Key- Loan ID, Month
_Insert——
| Loan ID | Loan Details | Month |
| 123 | Car Loan | 01-May-22 |
| 124 | House Loan | 31-Apr-22 |
_Edit——
|Loan ID| Loan Details| Month |
| 123 | Student Loan| 02-May-22|
| 123 | House Loan | 05-May-22|
现在,我试图使用dropDuplicates
与数据框,这是一个合并的delta_table [Insert+Edit
]
df_Insert = spark.sql("""SELECT * FROM insertedit""")
df_Insert_filter = df_Insert.dropDuplicates(['Loan_ID', 'Month'])
看起来这并没有完成要求的工作。当从Deltalake加载到SQL时显示错误:PrimaryKey Violation, Cannot insert duplicate key in objectdbo.table
.
在这种情况下我该怎么办呢?
这个要求有点不清楚,但是如果您想根据贷款id和月列删除重复项,可以使用下面的代码。
from pyspark.sql.window import Window as W
from pyspark.sql.functions import *
win_func = W.partitionBy('loanid').orderBy(desc('month'))
df1 = df.withColumn('rownum',row_number().over(win_func)).filter(col('rownum')==1).drop('rownum')