如何使用Snowflake任务摄取大型流表



我正在使用Snowflake Kafka Sink连接器将数据从Debezium摄取到Snowflak表中。我在这个表上创建了一个Stream和一个Task。当来自Kafka的数据进入源表时,流被填充,任务运行MERGE命令将数据写入最终表。

然而,由于流已经增长到中等规模,大约有5000万行,因此任务无法运行到完成并超时。

为了解决这个问题,我尝试了以下方法:

  1. 将任务的超时时间从1小时增加到24小时
  2. 将仓库大小增加到中等

任务在24小时后仍未完成并超时。

是否需要一个更大的仓库来容纳50M行?如何使任务运行到完成?

MERGE语句

MERGE INTO TARGET.MESSAGE AS P
USING (SELECT RECORD_CONTENT:payload:before.id::VARCHAR          AS BEFORE_ID,
RECORD_CONTENT:payload:before.agency_id::VARCHAR   AS BEFORE_AGENCY_ID,
RECORD_CONTENT:payload:after.id::VARCHAR           AS AFTER_ID,
RECORD_CONTENT:payload:after.agency_id::VARCHAR    AS AFTER_AGENCY_ID,
RECORD_CONTENT:payload:after::VARIANT              AS PAYLOAD,
RECORD_CONTENT:payload:source.ts_ms::INT           AS TS_MS,
RECORD_CONTENT:payload:op::VARCHAR                 AS OP
FROM RAW.MESSAGE_STREAM
QUALIFY ROW_NUMBER() OVER (
PARTITION BY COALESCE(AFTER_ID, BEFORE_ID), COALESCE(AFTER_AGENCY_ID, BEFORE_AGENCY_ID)
ORDER BY TS_MS DESC
) = 1) PS ON (P.ID = PS.AFTER_ID AND P.AGENCY_ID = PS.AFTER_AGENCY_ID) OR
(P.ID = PS.BEFORE_ID AND P.AGENCY_ID = PS.BEFORE_AGENCY_ID)
WHEN MATCHED AND PS.OP = 'd' THEN DELETE
WHEN MATCHED AND PS.OP IN ('u', 'r') THEN UPDATE SET P.PAYLOAD = PS.PAYLOAD, P.TS_MS = PS.TS_MS
WHEN NOT MATCHED AND PS.OP IN ('c', 'r', 'u') THEN INSERT (P.ID, P.AGENCY_ID, P.PAYLOAD, P.TS_MS) VALUES (PS.AFTER_ID, PS.AFTER_AGENCY_ID, PS.PAYLOAD, PS.TS_MS);

EXPLAIN计划

GlobalStats:
partitionsTotal=742
partitionsAssigned=742
bytesAssigned=3596441600
Operations:
1:0     ->Result  number of rows inserted, number of rows updated, number of rows deleted  
1:1          ->WindowFunction  ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST)  
1:2               ->LeftOuterJoin  joinFilter: ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id'))))) OR ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id')))))  
1:3                    ->Filter  ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST) = 1  
1:4                         ->UnionAll  
1:5                              ->Filter  CHANGES.A_METADATA$ACTION IS NOT NULL  
1:6                                   ->WithReference  
1:7                                        ->WithClause  CHANGES  
1:8                                             ->Filter  (A.METADATA$SHORTNAME IS NULL) OR (D.METADATA$SHORTNAME IS NULL) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_METADATA, SCAN_FDN_FILES.RECORD_METADATA))) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_CONTENT, SCAN_FDN_FILES.RECORD_CONTENT)))  
1:9                                                  ->FullOuterJoin  joinKey: (D.METADATA$ROW_ID = A.METADATA$ROW_ID) AND (D.METADATA$SHORTNAME = A.METADATA$SHORTNAME)  
1:10                                                       ->TableScan  DATABASE.RAW.MESSAGE as SCAN_FDN_FILES  METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER  {partitionsTotal=17, partitionsAssigned=17, bytesAssigned=20623360}
1:11                                                       ->TableScan  DATABASE.RAW.MESSAGE as SCAN_FDN_FILES  METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER  {partitionsTotal=507, partitionsAssigned=507, bytesAssigned=3519694336}
1:12                              ->Filter  CHANGES.D_METADATA$ACTION IS NOT NULL  
1:13                                   ->WithReference  
1:14                    ->TableScan  DATABASE.TARGET.MESSAGE as P  ID, AGENCY_ID  {partitionsTotal=218, partitionsAssigned=218, bytesAssigned=56123904}

查询配置文件

查询配置文件

我已经重新调整了SQL,只是为了让它对我来说更可读

MERGE INTO target.message AS p
USING (
SELECT 
record_content:payload:before.id::VARCHAR          AS before_id,
record_content:payload:before.agency_id::VARCHAR   AS before_agency_id,
record_content:payload:after.id::VARCHAR           AS after_id,
record_content:payload:after.agency_id::VARCHAR    AS after_agency_id,
record_content:payload:after::VARIANT              AS payload,
record_content:payload:source.ts_ms::INT           AS ts_ms,
record_content:payload:op::VARCHAR                 AS op,
COALESCE(after_id, before_id) AS id_a
COALESCE(after_agency_id, before_agency_id) AS id_b
FROM raw.message_stream
QUALIFY ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) = 1
) AS ps 
ON (p.id = ps.after_id AND p.agency_id = ps.after_agency_id) OR
(p.id = ps.before_id AND p.agency_id = ps.before_agency_id)
WHEN MATCHED AND ps.op = 'd' 
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r') 
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u') 
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms) 
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);

我看不出这里有什么特别可怕的东西。我把QUALIFY中使用的两个值的COALENCE推到SELECT中,这样我就可以更简单地阅读它。

但是看看ON逻辑,如果after不匹配,你准备匹配before的,但将其与COALESCE逻辑混合,两个after值是否同时为null?也就是说,如果after_id为空,那么after_agency_id也将为空。因为如果你也不想关心支票";在";如果";在";不为null,但不匹配。然后你可以使用:

ON p.id = ps.id_a AND p.agency_id = ps.id_b

不过,你可能想更好地命名它们。这应该会稍微改善一下。

回到JOIN逻辑,我认为上面可能适用的另一个原因是,您正在根据after值(如果存在(对ROW_NUMBER进行分组/分区,这意味着如果您的值具有相同的after值和不同的before值,则由于当前的ROW_NUMBER,后面的值可能会被丢弃。

但除此之外,它看起来并没有在做任何事情;真的很糟糕";这时,你可能想运行一个4-8倍大的仓库,让它运行24/8小时,看看它是否能在10%的额外时间内完成。较大仓库的成本应在较小的实时时间内抵消。

愚蠢的想法:

对于您提到的较小的数据集,请尝试SQL,它非常简单:

MERGE INTO target.message AS p
USING (
(
SELECT 
b.before_id,
b.before_agency_id,
b.after_id,
b.after_agency_id,
b.payload,
b.ts_ms,
b.op,
FROM (
SELECT 
A.*
,COALESCE(a.after_id, a.before_id) AS id_a
,COALESCE(a.after_agency_id, a.before_agency_id) AS id_b
,ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) as rn
FROM (
SELECT 
record_content:payload:before.id::VARCHAR          AS before_id,
record_content:payload:before.agency_id::VARCHAR   AS before_agency_id,
record_content:payload:after.id::VARCHAR           AS after_id,
record_content:payload:after.agency_id::VARCHAR    AS after_agency_id,
record_content:payload:after::VARIANT              AS payload,
record_content:payload:source.ts_ms::INT           AS ts_ms,
record_content:payload:op::VARCHAR                 AS op,
FROM raw.message_stream
`   ) as A
) AS B
WHERE b.rn = 1   
) AS ps 
ON (p.id = ps.after_id AND p.agency_id = ps.after_agency_id) OR
(p.id = ps.before_id AND p.agency_id = ps.before_agency_id)
WHEN MATCHED AND ps.op = 'd' 
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r') 
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u') 
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms) 
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);

和我怀疑会为你工作的连接数据。。在克隆的表上,只是为了看看性能的影响:

MERGE INTO target.message AS p
USING (
(
SELECT 
--b.before_id,
--b.before_agency_id,
b.after_id,
b.after_agency_id,
b.payload,
b.ts_ms,
b.op,
b.id_a,
b.id_b
FROM (
SELECT 
A.*
,COALESCE(a.after_id, a.before_id) AS id_a
,COALESCE(a.after_agency_id, a.before_agency_id) AS id_b
,ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) as rn
FROM (
SELECT 
record_content:payload:before.id::VARCHAR          AS before_id,
record_content:payload:before.agency_id::VARCHAR   AS before_agency_id,
record_content:payload:after.id::VARCHAR           AS after_id,
record_content:payload:after.agency_id::VARCHAR    AS after_agency_id,
record_content:payload:after::VARIANT              AS payload,
record_content:payload:source.ts_ms::INT           AS ts_ms,
record_content:payload:op::VARCHAR                 AS op,
FROM raw.message_stream
`   ) as A
) AS B
WHERE b.rn = 1   
) AS ps 
ON p.id = ps.id_a AND p.agency_id = ps.id_b
WHEN MATCHED AND ps.op = 'd' 
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r') 
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u') 
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms) 
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);

另一件事是尝试";想想积压的工作">

将任务分为几个步骤,现在,制作一个临时表,即前半部分:

CREATE TABLE perm_but_call_temp_table AS
SELECT 
record_content:payload:before.id::VARCHAR          AS before_id,
record_content:payload:before.agency_id::VARCHAR   AS before_agency_id,
record_content:payload:after.id::VARCHAR           AS after_id,
record_content:payload:after.agency_id::VARCHAR    AS after_agency_id,
record_content:payload:after::VARIANT              AS payload,
record_content:payload:source.ts_ms::INT           AS ts_ms,
record_content:payload:op::VARCHAR                 AS op,
COALESCE(after_id, before_id) AS id_a
COALESCE(after_agency_id, before_agency_id) AS id_b
FROM raw.message_stream
QUALIFY ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) = 1

然后将其合并到主表中。

MERGE INTO target.message AS p
USING perm_but_call_temp_table AS ps 
ON p.id = ps.id_a AND p.agency_id = ps.id_b
WHEN MATCHED AND ps.op = 'd' 
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r') 
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u') 
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms) 
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);

这会给你一个想法";其中问题是";第一操作或第二操作。它还允许您合并到克隆中,并测试equi-join版本是否运行得更快,结果是否相同。

最新更新