我是bonobo-etl的新手,我正在尝试编写一个同时加载多个文件的作业,但我无法让CsvReader使用@use_context_processor
注释。我的代码片段:
def input_file(self, context):
yield 'test1.csv'
yield 'test2.csv'
yield 'test3.csv'
@use_context_processor(input_file)
def extract(f):
return bonobo.CsvReader(path=f,delimiter='|')
def load(*args):
print(*args)
def get_graph(**options):
graph = bonobo.Graph()
graph.add_chain(extract,load)
return graph
当我运行作业时,我会得到类似<bonobo.nodes.io.csv.CsvReader object at 0x7f849678dc88>
的东西,而不是CSV的行。
如果我像graph.add_chain(bonobo.CsvReader(path='test1.csv',delimiter='|'),load)
那样对阅读器进行硬编码,它就会工作。
如有任何帮助,我们将不胜感激。
谢谢。
作为倭黑猩猩。CsvReader不支持(目前(从输入流中读取文件名,您需要使用自定义读取器。
以下是一个适用于我的一组csv的解决方案:
import bonobo
import bonobo.config
import bonobo.util
import glob
import csv
@bonobo.config.use_context
def read_multi_csv(context, name):
with open(name) as f:
reader = csv.reader(f, delimiter=';')
headers = next(reader)
if not context.output_type:
context.set_output_fields(headers)
for row in reader:
yield tuple(row)
def get_graph(**options):
graph = bonobo.Graph()
graph.add_chain(
glob.glob('prenoms_*.csv'),
read_multi_csv,
bonobo.PrettyPrinter(),
)
return graph
if __name__ == '__main__':
with bonobo.parse_args() as options:
bonobo.run(get_graph(**options))
对这个片段的评论很少,按阅读顺序:
use_context
装饰器将把节点执行上下文注入到转换调用中,允许使用第一个csv头使用.set_output_fields(...)
- 其他csv标头被忽略,在我的情况下,它们都是一样的。对于您自己的案例,您可能需要稍微复杂一点的逻辑
- 然后,我们只需使用
glob.glob
在bonobo.Graph
实例中生成文件名(在我的情况下,流将包含:prenoms_2004.csv prenoms_2005.csv…prenoms_2011.csv preoms_2012.csv(,并将其传递给我们的自定义读取器,该读取器将为每个文件调用一次,打开它,并生成其行
希望能有所帮助!