python Luigi的活动处理



我一直在尝试将Luigi集成为我们的工作流处理程序。目前,我们正在使用Concourse,但是我们要做的许多事情都是在大厅里摆脱困境的麻烦,因此我们将其转移到Luigi作为我们的依赖经理。到目前为止,没有问题,工作流程触发并正确执行。

当任务出于任何原因失败时,问题就会出现。该案例特别需要一项任务块,但是所有情况都需要照顾。截至目前,路易吉优雅地照顾了这一错误,并将其写信给Stdout。它仍然排放并退出代码0,这意味着工作通过。假阳性。

我一直在尝试使活动处理以解决此问题,但是即使工作非常简单,我也无法触发它:

@luigi.Task.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    with open('/root/luigi', 'a') as f:
        f.write("we got the exception!") #testing in concourse image
    sys.exit(luigi.retcodes.retcode().unhandled_exception)
class Test(luigi.Task):
    def requires(self):
        raise Exception()
        return []
    def run(self):
        pass
    def output(self):
        return []

然后在python shell中运行命令

luigi.run(main_task_cls=Test, local_scheduler=True)

例外被提高,但甚至没有开火或其他东西。该文件未编写,退出代码仍然为0。

另外,如果它有所不同

[retcode]
already_running=10
missing_data=20
not_run=25
task_failed=30
scheduling_error=35
unhandled_exception=40

我不知所措,为什么事件处理程序不会触发,但是我需要以某种方式失败错误。

似乎问题在于您将" RAISE异常"调用。如果将其放置在需要功能中 - 它基本上在测试任务运行方法之前运行。因此,这不是您的测试任务失败,而是它取决于的任务(现在为空...)。

例如,如果您将加薪移动到运行,则您的代码将按照您的期望。

    def run(self):
       print('start')
       raise Exception()

要处理依赖性失败的情况(在这种情况下,在需求方法中提高了异常),您可以添加另一种类型的luigi事件处理程序,broken_task:luigi.event.broken_task。这将确保Luigi代码排放返回代码(不同于0)。

欢呼!

如果您想在requires()中捕获异常,请使用以下内容:

@luigi.Task.event_handler(luigi.Event.BROKEN_TASK)
def mourn_failure(task, exception):
    ...

如果我正确理解,您只希望Luigi在任务失败时返回错误代码,我对此有很多问题,但是事实证明这很简单,您只是需要在命令行上与Luigi一起运行它,而不是与Python一起运行。这样:

luigi --module my_module MyTask

我不知道这是否也是您的问题,但是我和Python一起跑步,然后Luigi忽略了Luigi.cfg上的retcodes。希望它有帮助。

相关内容

  • 没有找到相关文章