使用bobobo etl加载多个文件



我是bonobo-etl的新手,我正在尝试编写一个同时加载多个文件的作业,但我无法让CsvReader使用@use_context_processor注释。我的代码片段:

def input_file(self, context):
yield 'test1.csv'
yield 'test2.csv'
yield 'test3.csv'
@use_context_processor(input_file)
def extract(f):
return bonobo.CsvReader(path=f,delimiter='|')
def load(*args):
print(*args)
def get_graph(**options):
graph = bonobo.Graph()
graph.add_chain(extract,load)
return graph

当我运行作业时,我会得到类似<bonobo.nodes.io.csv.CsvReader object at 0x7f849678dc88>的东西,而不是CSV的行。

如果我像graph.add_chain(bonobo.CsvReader(path='test1.csv',delimiter='|'),load)那样对阅读器进行硬编码,它就会工作。

如有任何帮助,我们将不胜感激。

谢谢。

作为倭黑猩猩。CsvReader不支持(目前(从输入流中读取文件名,您需要使用自定义读取器。

以下是一个适用于我的一组csv的解决方案:

import bonobo
import bonobo.config
import bonobo.util
import glob
import csv
@bonobo.config.use_context
def read_multi_csv(context, name):
with open(name) as f:
reader = csv.reader(f, delimiter=';')
headers = next(reader)
if not context.output_type:
context.set_output_fields(headers)
for row in reader:
yield tuple(row)
def get_graph(**options):
graph = bonobo.Graph()
graph.add_chain(
glob.glob('prenoms_*.csv'),
read_multi_csv,
bonobo.PrettyPrinter(),
)
return graph

if __name__ == '__main__':
with bonobo.parse_args() as options:
bonobo.run(get_graph(**options))

对这个片段的评论很少,按阅读顺序:

  • use_context装饰器将把节点执行上下文注入到转换调用中,允许使用第一个csv头使用.set_output_fields(...)
  • 其他csv标头被忽略,在我的情况下,它们都是一样的。对于您自己的案例,您可能需要稍微复杂一点的逻辑
  • 然后,我们只需使用glob.globbonobo.Graph实例中生成文件名(在我的情况下,流将包含:prenoms_2004.csv prenoms_2005.csv…prenoms_2011.csv preoms_2012.csv(,并将其传递给我们的自定义读取器,该读取器将为每个文件调用一次,打开它,并生成其行

希望能有所帮助!

最新更新