由于路易吉的工作分配不均,工人过早死亡(2.6.1)



我们正在尝试运行分布在docker swarm集群上的简单管道。luigi 工作线程部署为复制的 docker 服务。他们成功启动,在向 luigi-server 请求工作几秒钟后,由于没有分配给他们工作,他们开始死亡,所有任务最终都分配给了一个工人。

我们不得不在路易吉.cfg的工人中设置keep_alive=True,以迫使他们不要死,但是在管道完成后让工人留在身边似乎是一个坏主意。有没有办法控制工作分配?

我们的测试管道:

class RunAllTasks(luigi.Task):
    tasks = luigi.IntParameter()
    sleep_time = luigi.IntParameter()
    def requires(self):
        for i in range(self.tasks):
            yield RunExampleTask(i, self.sleep_time)
    def run(self):
        with self.output().open('w') as f:
            f.write('All done!')
    def output(self):
        return LocalTarget('/data/RunAllTasks.txt')

class RunExampleTask(luigi.Task):
    number = luigi.IntParameter()
    sleep_time = luigi.IntParameter()
    @property
    def cmd(self):
        return """
               docker run --rm --name example_{number} hello-world
           """.format(number=self.number)
    def run(self):
        time.sleep(self.sleep_time)
        logger.debug(self.cmd)
        out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True)
        logger.debug(out)
        with self.output().open('w') as f:
            f.write(str(out))
    def output(self):
        return LocalTarget('/data/{number}.txt'.format(number=self.number))

if __name__ == "__main__":
    luigi.run()

您的问题是一次yield一个要求的结果,而是您希望一次yield所有要求,如下所示:

def requires(self):
    reqs = []
    for i in range(self.tasks):
        reqs.append(RunExampleTask(i, self.sleep_time))
    yield reqs

最新更新