我们正在尝试运行分布在docker swarm集群上的简单管道。luigi 工作线程部署为复制的 docker 服务。他们成功启动,在向 luigi-server 请求工作几秒钟后,由于没有分配给他们工作,他们开始死亡,所有任务最终都分配给了一个工人。
我们不得不在路易吉.cfg的工人中设置keep_alive=True,以迫使他们不要死,但是在管道完成后让工人留在身边似乎是一个坏主意。有没有办法控制工作分配?
我们的测试管道:
class RunAllTasks(luigi.Task):
tasks = luigi.IntParameter()
sleep_time = luigi.IntParameter()
def requires(self):
for i in range(self.tasks):
yield RunExampleTask(i, self.sleep_time)
def run(self):
with self.output().open('w') as f:
f.write('All done!')
def output(self):
return LocalTarget('/data/RunAllTasks.txt')
class RunExampleTask(luigi.Task):
number = luigi.IntParameter()
sleep_time = luigi.IntParameter()
@property
def cmd(self):
return """
docker run --rm --name example_{number} hello-world
""".format(number=self.number)
def run(self):
time.sleep(self.sleep_time)
logger.debug(self.cmd)
out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True)
logger.debug(out)
with self.output().open('w') as f:
f.write(str(out))
def output(self):
return LocalTarget('/data/{number}.txt'.format(number=self.number))
if __name__ == "__main__":
luigi.run()
您的问题是一次yield
一个要求的结果,而是您希望一次yield
所有要求,如下所示:
def requires(self):
reqs = []
for i in range(self.tasks):
reqs.append(RunExampleTask(i, self.sleep_time))
yield reqs