Snowflakehow如何按特定顺序对同一主键执行多个DML操作



我正在尝试在Snowflake中设置连续数据复制。我得到在源系统中发生的事务,我需要在Snowflake中按照与源系统相同的顺序执行它们。我正在尝试使用MERGE,但当在源系统中对同一个密钥有多个操作时,MERGE就不能正常工作。它要么错过了一个操作,要么返回在DML操作错误期间检测到的重复行。

请注意,交易需要按照准确的顺序进行,不可能将最新的交易作为密钥并直接执行(就像记录已经插入并更新一样,在Snowflake中,它也需要先插入然后更新,即使插入只是暂时状态(。

以下是示例:

create or replace table employee_source (
id int,
first_name varchar(255),
last_name varchar(255),
operation_name varchar(255),
binlogkey integer
)
create or replace table employee_destination ( id int, first_name varchar(255), last_name varchar(255) );
insert into employee_source values (1,'Wayne','Bells','INSERT',11);
insert into employee_source values (1,'Wayne','BellsT','UPDATE',12);
insert into employee_source values (2,'Anthony','Allen','INSERT',13);
insert into employee_source values (3,'Eric','Henderson','INSERT',14);
insert into employee_source values (4,'Jimmy','Smith','INSERT',15);
insert into employee_source values (1,'Wayne','Bellsa','UPDATE',16);
insert into employee_source values (1,'Wayner','Bellsat','UPDATE',17);
insert into employee_source values (2,'Anthony','Allen','DELETE',18); 
MERGE into employee_destination as T using (select * from employee_source order by binlogkey) 
AS S
ON T.id = s.id
when not matched
And S.operation_name = 'INSERT' THEN
INSERT (id,
first_name,
last_name)
VALUES (
S.id,    
S.first_name,
S.last_name)
when matched AND S.operation_name = 'UPDATE'
THEN
update set T.first_name = S.first_name, T.last_name = S.last_name
When matched
And S.operation_name = 'DELETE' THEN DELETE;

在处理完所有行之后,我希望在employee_destination表中看到Bellsat作为员工id 1的姓氏。同样,我不应该在employee_destination表中看到emp id 2。

除了合并,还有其他选择吗?基本上以相同的顺序遍历每个DML(使用binlogkey列进行排序(。

谢谢。

您需要操作源数据,以确保每个键/操作只有一条记录,否则联接将是不确定的,并且将(根据您的设置(出错或使用随机的一条适用源记录进行更新。此处的文档中包含了这一点https://docs.snowflake.com/en/sql-reference/sql/merge.html#duplicate-加入行为。

无论如何,为什么你只想更新一条记录,让它被另一个更新覆盖——这会非常低效?

由于您的更新似乎包括所有行的新值,因此您可以使用窗口函数来获取最新的传入更改,然后将这些结果合并到目标表中。例如,该合并的选择(使用窗口功能只获取最新的更改(如下所示:

with SOURCE_DATA as
(
select   COLUMN1::int       ID
,COLUMN2::string    FIRST_NAME
,COLUMN3::string    LAST_NAME
,COLUMN4::string    OPERATION_NAME
,COLUMN5::int       PROCESSING_ORDER
from values 
(1,'Wayne','Bells','INSERT',11),
(1,'Wayne','BellsT','UPDATE',12),
(2,'Anthony','Allen','INSERT',13),
(3,'Eric','Henderson','INSERT',14),
(4,'Jimmy','Smith','INSERT',15),
(1,'Wayne','Bellsa','UPDATE',16),
(1,'Wayne','Bellsat','UPDATE',17),
(2,'Anthony','Allen','DELETE',18)
)
select * from SOURCE_DATA
qualify row_number() over (partition by ID order by PROCESSING_ORDER desc) = 1

这将产生一个结果集,该结果集只有合并到目标表所需的更改:

><17>
IDFIRST_NAME
1WayneBellsat更新
2Anthony3Eric

最新更新