Hadoop流:Python连接不同的文件



我有一个处理程序,它接受输入数据,处理它并输出数据。在此期间,它生成两个日志IN.log和OUT.log

IN.log包含数据进入的时间和数据的。OUT.log包含处理数据的时间和数据的。所以IN.log包含实时id

OUT.log包含超时id

现在,作为使用python处理hadoop流的一部分,我想将这两个文件连接起来,并附带intime和out时间的diff以及数据的id。

例如:

2秒id123

3秒id112

关于如何使用巨蟒实现这一点,有什么建议吗?

看看运行hadoop作业的MRjob帮助程序包。为这个任务写一个map/reduce会很容易,就像下面的代码一样

from datetime import datetime
from MRJob import MRJob
class JoinJob(MRJob):
    fmt = '%Y-%M-%d'
    def steps(self):
        return [self.mr(mapper=self.mapper, 
                        reducer=self.reducer)]
    def mapper(self, rec_time, rec_id):
        yield rec_id, rec_time
    def reducer(self, rec_id, datetime_strs):
        datetimes = map(lambda x: datetime.strptime(x, self.fmt), 
                            datetime_strs)
        delta_secs = (max(datetimes) - min(datetimes)).total_seconds()
        yield rec_id, delta_secs
if __name__ == '__main__':
    JoinJob.run()

相关内容

  • 没有找到相关文章

最新更新