雪花设置任务依赖项



我有一个任务需要在成功完成不同的前一个任务后执行。

Say for example below three tasks triggers at same time and calls different stored proc.
CREATE TASK myschema.mytask_1
WAREHOUSE = mywh
schedule='USING CRON 0 8 * * MON America/Los_Angeles'
as call myschema.MY_PROC_1();
CREATE TASK myschema.mytask_2
WAREHOUSE = mywh
schedule='USING CRON 0 8 * * MON America/Los_Angeles'
as call myschema.MY_PROC_2();

CREATE TASK myschema.mytask_3
WAREHOUSE = mywh
schedule='USING CRON 0 8 * * MON America/Los_Angeles'
as call myschema.MY_PROC_3();

然而,我希望下面的第4个任务在以上三个任务成功完成后执行。如果他们中的任何一个失败了,第四不应该触发。总之,第四个任务取决于完成以上三个任务。我通读了一些雪花文档,发现只有一个任务可以设置为依赖项。因为现在可以一个接一个地思考下面的问题。此外,我不确定如何评估成功完成先前的任务,以进一步进行。有人可以帮助我实现这是在任何更好的方式。如有任何帮助,我将不胜感激。

CREATE TASK myschema.mytask_1
WAREHOUSE = mywh
schedule='USING CRON 0 8 * * MON America/Los_Angeles'
as call myschema.MY_PROC_1();
CREATE TASK myschema.mytask_2
WAREHOUSE = mywh
AFTER myschema.mytask_1
as call myschema.MY_PROC_2();

CREATE TASK myschema.mytask_3
WAREHOUSE = mywh
AFTER myschema.mytask_2
as call myschema.MY_PROC_3();

CREATE TASK myschema.mytask_4
WAREHOUSE = mywh
AFTER myschema.mytask_3
as call myschema.MY_PROC_4();

虽然Mike Walton建议的Streams解决方案很吸引人,但实现单个存储过程(可能在显式事务中,以便在发生错误时回滚)可能是一个更简单的解决方案,因此也更易于维护。话虽如此,如果性能是关键,您可能希望选择streams选项,因为它保证每个存储过程中的不同代码段并发运行,而Single-SP将顺序运行它们。

如果连续运行任务,则任何失败都将阻止其余任务的执行,并且当下一次计划执行时,它将从头开始并再次执行。根据你的逻辑,这可能不是你想要的行为。

关于第一个选项,这里一个可能的解决方案是利用流来启动第四个任务。这有点不正统,但你可以让它起作用。以下是尝试的基本步骤:

  1. 在成功完成SP后,3个并行任务中的每一个都需要将一条记录插入到单独的表中,因此它必须是SP中的最后一步。
  2. 这3个表中的每一个都需要创建一个STREAM对象。
  3. 你可以安排任务每分钟运行一次,并使用一个WHEN子句,看起来像下面的代码。
  4. 你需要在第4个任务之后执行一些额外的任务,对你的流做一些DML语句,这样流就会被重置。

步骤3示例:

CREATE OR REPLACE TASK mytask_4
WAREHOUSE = xxxx
SCHEDULE = '1 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('mytask_1_stream') = True
AND SYSTEM$STREAM_HAS_DATA('mytask_2_stream') = True
AND SYSTEM$STREAM_HAS_DATA('mytask_3_stream') = True;

步骤4示例:

CREATE OR REPLACE TASK mytask_5
WAREHOUSE = xxxx
AFTER myschema.mytask_4
INSERT * INTO log_table FROM mytask_1_stream;
CREATE OR REPLACE TASK mytask_6
WAREHOUSE = xxxx
AFTER myschema.mytask_4
INSERT * INTO log_table FROM mytask_2_stream;
CREATE OR REPLACE TASK mytask_7
WAREHOUSE = xxxx
AFTER myschema.mytask_4
INSERT * INTO log_table FROM mytask_3_stream;

就像我说的,这是一个变通方法,但在大多数情况下它应该很好地工作。另外一点,在本例中,mytask_4将永远不会使用任何计算,除非所有3个流都包含数据,这意味着您前面的所有3个任务都已成功执行。否则,该任务将被跳过,等待下一分钟"再次检查"。如果您运行前3个任务的频率较低,那么如果您愿意,也可以将mytask_4安排为运行频率较低。

最新更新