我错过了关于Yelp的mrjob作业库的一些明显的东西。设置MRJob类几乎非常简单。在文件或标准输入上运行它也是如此。但是,如何将作业的输入从本地或s3中的文件更改为s3桶中的键呢?
就像这样。假设我想计算S3桶中以字符串'foo'开头的所有对象:
import re
class MRCountS3Objects(MRJob):
define mapper(self, _, botoS3Key):
if re.match('^foo', botoS3Key.name):
yield 'foo', 1
define reduce(self, name, occurrences):
yield name, sum(occurrences)
这是一个非常做作的例子,但你可能明白我的意思。我如何告诉MRJob在s3对象流上操作,而忽略对象的内容?我看到了s3filessystem .get_s3_keys()方法,它为我提供了我需要的流,但我不确定从那里去哪里。
至少有一种方法可以做到这一点。您的MRJob有一个stdin
属性,可以将其分配给任何迭代器,然后您可以以编程方式运行该作业。例如,这段代码应该处理my-bucket
的键名:
from mrjob.job import MRJob
from mrjob.emr import EMRJobRunner
class MRS3KeyProcessor(MRJob):
# Do some MRJob stuff.
...
def s3_name_generator(bucket):
"""Generator that returns boto.s3.Key names.
"""
# Could also use raw boto here.
emr = EMRJobRunner()
key_stream = emr.fs.get_s3_keys(bucket)
for key in key_stream:
yield key.name
def main():
# The '-' argument signifies that we use stdin.
mr_job = MRCountS3Objects(['--runner', 'inline', '-'])
stdin = s3_name_generator('my-bucket')
mr_job.stdin = stdin
results = []
with mr_job.make_runner() as runner:
runner.run()
for line in runner.stream_output():
key, value = mr_job.parse_output_line(line)
results.append((key, value))
print(results)
if __name__ == '__main__':
main()