什么定义了"complete"路易吉任务?



我正在构建我的第一个 Luigi 管道,目前在构建依赖项之前我正在单独测试任务。在测试期间,我使用以下主要方法的版本来构建任务:

if __name__ == "__main__":
headers = dict()
headers["Content-Type"] = "application/json"
headers["Accept"] = "application/json"
luigi.build[(CSVValidator(jsonfile = '/sample_input/sample_csv.json',
docfile = None,
error_limit = 2,
order_fields = 3,
output_file = 'validation_is_us.txt',
header = headers)])
luigi.run()

这是我的csv_validator的样子:

class CSVValidator(luigi.Task):
jsonfile = luigi.Parameter()
docfile = luigi.Parameter()
error_limit = luigi.Parameter()
order_fields = luigi.Parameter()
output_file = luigi.Parameter()
header = luigi.DictParameter()
def output(self):
return luigi.LocalTarget(self.output_file + "/csv_validator_data_%s.txt" % time.time())
def run(self):
output_file = self.output().open('w')
files = {}
data = {}
files["jsonfile"] = open(self.jsonfile, 'rb')
files["docfile"] = open(self.docfile, 'rb')
data["error_limit"] = self.error_limit
data["order_fields"] = self.order_fields
r = requests.post(*****~~~~~*****~~~~~,
headers=headers,
data=data, files=files)
task_response = r.text.encode(encoding="UTF-8")
print type(task_response)
print(task_response)
jsontaskdata = json.loads(task_response)
json.dump(jsontaskdata, output_file)
print("validated")
output_file.close()

但是,此任务从未实际运行。相反,luigi 中央调度程序声称此任务已经完成:

===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 1 complete ones were encountered:
- 1 CSVValidator(...)
* 1 ran successfully:
- 1 Downloader(...)

这种进展看起来:)因为没有失败的任务或缺少依赖项

我创建的其他任务,例如下载器,每次都能成功运行。这里定义完整任务的定义是什么?我不明白这是什么意思。

谢谢你的时间!

输出方法返回的 Target-对象定义任务是否完成。

如果某个输出文件已存在,或者存在某些其他条件(包括外部资源的可用性(,则可能会创建该对象。例如,在luigi.contrib.esindex.py中,(检查(某个远程集群中是否存在 ElasticSearch 索引将创建 Target-对象并告诉您任务 (CopyIndex( 已完成。

您可能还想看看这个答案:https://stackoverflow.com/a/34638943/4125622

而这个讨论:https://github.com/spotify/luigi/issues/595

最新更新