SCD类型2合并逻辑导致目标表中出现重复



我正在尝试在大查询中实现SCD类型2合并逻辑。

我在3设置方法中这样做,但每次运行合并逻辑脚本时,即使没有新记录,它似乎也会将记录推送到目标表中。

  • 步骤1:当记录在自然键上匹配时,停用目标表中的活动记录
  • 步骤2:将先前不存在的新记录插入目标表
  • 步骤3:将更新后的记录从源表插入目标表(我在这里得到重复(

这是我的代码

CREATE
OR REPLACE PROCEDURE `processed.sp_merge_example` (job_run_id INT64) OPTIONS (strict_mode = false)
BEGIN
DECLARE run_id INT64 DEFAULT job_run_id;
---checking for natural keys for updates
MERGE INTO `processed.<Destination table>` AS T
USING `transient.<Source table>` AS S
ON T.<Destination table>_fingerprint = S.<Source table>_fingerprint
WHEN MATCHED
AND S.type2_checksum != T.type2_checksum
AND T.current_flg = 'Y'
THEN
UPDATE
---updating the records in the final table which has updates in the incremental load.
SET T.modified_datetime = current_datetime(),
T.modified_by = CAST(run_id AS STRING),
T.end_datetime = current_datetime(),
T.current_flg = 'N'
WHEN NOT MATCHED
THEN
---inserting new records.
INSERT (
<Source table columns>
source_type,
material_sales_fingerprint,
type2_checksum,
start_datetime,
end_datetime,
current_flg,
created_by,
created_datetime,
modified_by,
modified_datetime
)
VALUES (
<Source table values>
S.source_type,
S.material_sales_fingerprint,
S.type2_checksum,
current_datetime(),
parse_date('%Y%m%d', '99991231'),
'Y',
CAST(run_id AS STRING),
current_datetime(),
CAST(run_id AS STRING),
current_datetime()
);
-- insert into newly updated records
INSERT `processed.dim_material_sales` (
<Source table columns>
source_type,
material_sales_fingerprint,
type2_checksum,
start_datetime,
end_datetime,
current_flg,
created_by,
created_datetime,
modified_by,
modified_datetime
)
SELECT 
<Source table columns>
ST.source_type,
ST.material_sales_fingerprint,
ST.type2_checksum,
current_datetime(),
parse_date('%Y%m%d', '99991231'),
'Y',
CAST(run_id AS STRING),
current_datetime(),
CAST(run_id AS STRING),
current_datetime()
FROM (
SELECT x.*
FROM `transient.<Source table>` x
JOIN `processed.<Destination table>` y 
ON x.<Source table>_fingerprint = y.<Source table>_fingerprint
WHERE x.type2_checksum != y.type2_checksum and y.current_flg = 'N' 
) ST
END; 

-结尾的这段逻辑导致重复

ON x.<Source table>_fingerprint = y.<Source table>_fingerprint
WHERE x.type2_checksum != y.type2_checksum and y.current_flg = 'N' 

如果我用源中相同的记录运行合并逻辑n次,期望目标表应该有相同的记录,但使用上述逻辑,每次运行合并逻辑时,源中更新的记录总是插入到目标表中。我已经测试了很多次,不确定哪里出了问题。有人能帮我理解吗。

用以下逻辑掩盖此问题

CREATE OR REPLACE PROCEDURE `<stored_proc_name >`(job_run_id INT64) OPTIONS(strict_mode=false)
BEGIN
DECLARE run_id INT64 DEFAULT job_run_id;
-- Inserting updated records coming as part of incremental load to Destination first.
INSERT
`<destination_table>`(
<Target_table_columns>,
tablename_fingerprint,
type2_checksum,
start_datetime,
end_datetime,
current_flg,
created_by,
created_datetime,
modified_by,
modified_datetime)
SELECT
ST.<Target_table_columns>
current_datetime(),
cast(parse_timestamp('%Y%m%d%H%M%S', '99991231235959') as datetime),
'Y',
CAST(run_id AS STRING),
current_datetime(),
CAST(run_id AS STRING),
current_datetime()
FROM (
SELECT x.*
FROM
`<source_table>` x
JOIN
`<destination_table>` y
ON
y.<destination_table_fingerprint_column> = x.<source_temp_table_fingerprint_column>
Where x.type2_checksum != y.type2_checksum and y.current_flg = 'Y') ST;

---Merge Process Starts

MERGE INTO
`<destination_table>` AS T
USING
`<source_table>` AS S
ON
T.<destination_table_fingerprint_column> = S.<source_temp_table_fingerprint_column>
-- Updating older version of the records.
WHEN MATCHED
AND S.type2_checksum != T.type2_checksum
AND T.current_flg = 'Y'
THEN
UPDATE
SET T.modified_datetime = current_datetime(),
T.modified_by = CAST(run_id AS STRING),
T.end_datetime = current_datetime(),
T.current_flg = 'N'
-- Inserting new records from temp to final table
WHEN NOT MATCHED
THEN
INSERT
(
<Target_table_columns>,
tablename_fingerprint,
type2_checksum,
start_datetime,
end_datetime,
current_flg,
created_by,
created_datetime,
modified_by,
modified_datetime
)
VALUES
(
<S.Target_table_columns>,
current_datetime(),
cast(parse_timestamp('%Y%m%d%H%M%S', '99991231235959') as datetime),
'Y',
CAST(run_id AS STRING),
current_datetime(),
CAST(run_id AS STRING),
current_datetime()
);
END;

最新更新