使用Hadoop Streaming进行avro转换的python脚本



我有10 GB的输入文件,我试图使用python hadoop流转换为avro,工作成功,但我无法使用avro阅读器读取输出。

给出'utf8'编解码器无法解码位置13924的字节0xb4:无效的起始字节。

这里的问题是我使用hadoop流的mapper输出的stdout,如果我使用文件名并在本地使用脚本avro输出是可读的。

有什么想法,如何解决这个问题?我认为问题是围绕处理流中的键/值....

hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar 
                      -input "xxx.txt" 
                      -mapper "/opt/anaconda/anaconda21/bin/python mapper.py x.avsc"  
                      -reducer NONE 
                      -output "xxxxx" -file "mapper.py" 
                      -lazyOutput 
                      -file "x.avsc"

映射器脚本为

import sys
import re
import os
from avro import schema, datafile
import avro.io as io
import StringIO
schema_str = open("xxxxx.avsc", 'r').read()
SCHEMA = schema.parse(schema_str)
rec_writer = io.DatumWriter(SCHEMA)
df_writer  = datafile.DataFileWriter(sys.stdout, rec_writer, SCHEMA,)
header = []
for field in SCHEMA.fields:
        header.append(field.name)
for line in sys.stdin:
    fields = line.rstrip().split("x01")
    data   = dict(zip(header, fields))
    try:
        df_writer.append(data)
    except Exception, e:
        print "failed with data: %s" % str(data)
        print str(e)
df_writer.close()

终于可以解决这个问题了。使用输出格式类,并将avro二进制转换留给它。在流映射器中,只发出json记录。

hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar 
              -libjars avro-json-1.2.jar 
              -jobconf output.schema.url=hdfs:///x.avsc 
              -input "xxxxx" 
              -mapper "/opt/anaconda/anaconda21/bin/python mapper.py x.avsc"  
              -reducer NONE 
              -output "/xxxxx"  
              -outputformat com.cloudera.science.avro.streaming.AvroAsJSONOutputFormat 
              -lazyOutput 
              -file "mapper.py" 
              -file "x.avsc"

这里是mapper。py

import sys
from avro import schema
import json
schema_str = open("xxxxx.avsc", 'r').read()
SCHEMA = schema.parse(schema_str)
header = []
for field in SCHEMA.fields:
    header.append(field.name)
for line in sys.stdin:
    fields = line.rstrip().split("x01")
    data   = dict(zip(header, fields))
    try:
       print >> sys.stdout, json.dumps(data, encoding='ISO-8859-1')
    except Exception, e:
       print "failed with data: %s" % str(data)
       print str(e)

最新更新