我正在执行以下数据块增量表的合并操作-
spark.sql(""" MERGE INTO <delta table name> deltatbl USING <temp view> source
ON deltatbl.col1 = source.col1
AND deltatbl.col2 = source.col2
WHEN NOT MATCHED THEN INSERT
(col1,col2) VALUES(source.Col1,source.Col2) """)
上面的查询是插入重复记录,尽管匹配唯一的键。我如何才能实现输出,只有不匹配的记录被插入。所有列都是key的一部分。
如果要更新现有记录:
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
UPDATE SET events.data = updates.data
WHEN NOT MATCHED
THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
如果你只想插入不存在的记录:用相同的值更新
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
UPDATE SET events.data = events.data
WHEN NOT MATCHED
THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
你的箱子,
MERGE INTO <delta table name> deltatbl USING <temp view> source
ON deltatbl.col1 = source.col1
AND deltatbl.col2 = source.col2
WHEN MATCHED THEN
UPDATE SET deltatbl.data = deltatbl.data
WHEN NOT MATCHED THEN INSERT
(col1,col2) VALUES(source.Col1,source.Col2)
据我所知,一个问题可能是您插入的内容在其中有重复项。
当NOT MATCHED时,INSERT在插入前不重复数据删除。为了不将重复项加载到表中,您必须在运行合并之前删除重复项。
你可以通过python API来完成:
spark.table('{your_view_name_here}').dropDuplicates()
或者通过SQL API:
SELECT DISTINCT(*)
FROM {your_view_name_here}
没有更多的信息,这是我能提供的最好的猜测。