如何在路易吉中使用任务的输出



我有两个路易吉任务。

TaskA运行一个外部程序,结果以 json 的形式存储,并带有luigi.LocalTarget

class TaskA(ExternalProgramTask):
def output(self):
return luigi.LocalTarget(self.outputfile)

在 TaskB 中,我想对 json 进行一些转换并对其进行pickle.dump()。但是我在打开文件时遇到问题。

@inherits(TaskA)
class TaskB(luigi.Task):
def requires(self):
args = {....}
return TaskA(**args)
def run(self):
try: 
entries = json.load(self.input().open())
except json.decoder.JSONDecodeError as e:
logging.error(f"Decoding error: {e}")
return print(e)

但这不起作用,因为我收到解码错误:

Decoding error: Expecting value: line 6 column 1

这是有道理的,如果我尝试打印self.input().open()我会期待 json。但相反,我得到的是:<_io.TextIOWrapper name='task1.output.json' mode='rb' encoding='UTF-8'>

我也尝试使用yield TaskA()但这也没有用。但根据文档,它应该可以工作。

使用 Python 3.8.1 和最新的 luigi 版本。

我发现了这个问题。

事实上,解码错误是有道理的。我正在检查由外部工具生成的input.json,它是一个json。

但是,该工具中有一个错误,并在最后一个条目上附加了一个逗号,这使它成为无效的 json。

最新更新