使动态路易吉任务的失败不严重



>我有一个 luigi 工作流程,它通过 ftp 下载一堆大文件并将它们存放在 s3 上。

我有一个任务读取要下载的文件列表,然后创建一堆实际执行下载的任务

这个想法是,此工作流的结果是一个文件,其中包含已成功下载的列表,任何失败的下载将在第二天的下一次运行时重新尝试。

问题是,如果任何下载任务失败,则永远不会创建成功的下载列表。

这是因为动态创建的任务成为创建它们并从其输出编译列表的主任务的要求。

有没有办法使这些下载任务的失败变得微不足道,以便编译列表减去失败任务的输出?

下面的示例代码,GetFiles 是我们从命令行调用的任务。

class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()
def run(self):
    with self.output().open('w') as output:
        WriteFileFromFtp(sourceUrl, output)
def output(self):
    client = S3Client()
    return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)

@requires(GetListOfFileToDownload)
class GetFiles(luigi.Task):
def run(self):
    with self.input().open('r') as fileList:
        files = json.load(fileList)
        tasks = []
        taskOutputs = []
        for file in files:
            task = DownloadFileFromFtp(sourceUrl=file["ftpUrl"])
            tasks.append(task)
            taskOutputs.append(task.output())
        yield tasks
        successfulDownloads = MakeSuccessfulOutputList(taskOutputs)
    with self.output().open('w') as out:
        json.dump(successfulDownloads, out)
def output(self):
    client = S3Client()
    return S3Target(path='successfulDownloads.json', client=client)

几年后,您一定找到了答案,但这里有一些可以提供帮助的东西。

class DownloadFileFromFtp(luigi.Task):
      sourceUrl = luigi.Parameter()
      def run(self):
           with self.output().open('w') as output:
             WriteFileFromFtp(sourceUrl, output)
      
      def on_failure(self, exception):
          #If the task fails for any reason, 
          #then just indicate the task as completed.
          #From the docs, exception is a string, so you can easily.
          if "FileNotFound" in exception:
              return self.complete(ignore=True)
          return self.complete(ignore=False)
      def complete(self, ignore=False):
          return ignore
      def output(self):
          client = S3Client()
          return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)

这个答案可能是不正确的 - 检查评论

我已经阅读了几次文档,我没有发现非关键故障之类的迹象。话虽如此,这种行为可以通过在DownloadFileFromFtp中覆盖Task.complete方法轻松实现,同时仍然能够在GetFiles.run中使用DownloadFileFromFtp.output

通过覆盖return True,无论下载是否成功,任务DownloadFileFromFtp都将成功。

class DownloadFileFromFtp(luigi.Task):
    sourceUrl = luigi.Parameter()
    def run(self):
        with self.output().open('w') as output:
            WriteFileFromFtp(sourceUrl, output)
    def output(self):
        client = S3Client()
        return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)
    def complete(self,):
        return True

但请注意,您还可以在该complete方法中使用更复杂的逻辑 - 例如仅当任务在运行时遇到特定网络故障时才失败。

最新更新