Hadoop mapreduce中的XML解析



我编写了一个mapreduce代码,用于将XML解析为CSV。但是在运行作业后,我没有在我的输出目录中找到任何输出。我不确定文件是否未被读取或未被写入。

这是我的全部代码。

public class XmlParser11
{
        public static String outvalue;
        public static class XmlInputFormat1 extends TextInputFormat {
        public static final String START_TAG_KEY = "xmlinput.start";
        public static final String END_TAG_KEY = "xmlinput.end";
        
        public RecordReader<LongWritable, Text> createRecordReader(
                InputSplit split, TaskAttemptContext context) {
            return new XmlRecordReader();
        }
        public static class XmlRecordReader extends
                RecordReader<LongWritable, Text> {
            private byte[] startTag;
            private byte[] endTag;
            private long start;
            private long end;
            private FSDataInputStream fsin;
            private DataOutputBuffer buffer = new DataOutputBuffer();
            private LongWritable key = new LongWritable();
            private Text value = new Text();
                @Override
            public void initialize(InputSplit split, TaskAttemptContext context)
                    throws IOException, InterruptedException {
                    System.out.println("B");
                Configuration conf = context.getConfiguration();
                startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
                endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
                FileSplit fileSplit = (FileSplit) split;
                // open the file and seek to the start of the split
                start = fileSplit.getStart();
                end = start + fileSplit.getLength();
                Path file = fileSplit.getPath();
                FileSystem fs = file.getFileSystem(conf);
                fsin = fs.open(fileSplit.getPath());
                fsin.seek(start);
            }
        @Override
            public boolean nextKeyValue() throws IOException,
                    InterruptedException {
            System.out.println("C");
                if (fsin.getPos() < end) {
                    if (readUntilMatch(startTag, false)) {
                        try {
                            buffer.write(startTag);
                            if (readUntilMatch(endTag, true)) {
                                key.set(fsin.getPos());
                                value.set(buffer.getData(), 0,
                                        buffer.getLength());
                                return true;
                            }
                        } finally {
                            buffer.reset();
                        }
                    }
                }
                return false;
            }
        @Override
           public LongWritable getCurrentKey() throws IOException,
                    InterruptedException {
                return key;
            }
        @Override
            public Text getCurrentValue() throws IOException,
                    InterruptedException {
            
                return value;
            }
        @Override
            public void close() throws IOException {
                fsin.close();
            }
        @Override
            public float getProgress() throws IOException {
            
                return (fsin.getPos() - start) / (float) (end - start);
            }
            private boolean readUntilMatch(byte[] match, boolean withinBlock)
                    throws IOException {
                int i = 0;
                
                while (true) {
                    int b = fsin.read();
                    // end of file:
                    if (b == -1)
                        return false;
                    // save to buffer:
                    if (withinBlock)
                        buffer.write(b);
                    // check if we're matching:
                    if (b == match[i]) {
                        i++;
                        if (i >= match.length)
                            return true;
                    } else
                        i = 0;
                    // see if we've passed the stop point:
                    if (!withinBlock && i == 0 && fsin.getPos() >= end)
                        return false;
                }
            }
        }
    }

        public static class Map extends Mapper<Text, Text,
        Text, Text> {
            @SuppressWarnings("unchecked")
            @Override
            protected void map(Text key, Text value,
                     @SuppressWarnings("rawtypes") Mapper.Context context)
                             throws
                             IOException, InterruptedException {
                
                String document = value.toString();
                System.out.println("‘" + document + "‘");
        
                XMLInputFactory xmlif = XMLInputFactory.newInstance();
                XMLStreamReader xmlr;
        
            try {
                xmlr = xmlif.createXMLStreamReader(new FileReader(document));
                while(xmlr.hasNext())
                {
                   printEvent(xmlr);
                   xmlr.next();
                 }
                   xmlr.close();
                   context.write(null,new Text (outvalue));
            } catch (XMLStreamException e) {
                
                e.printStackTrace();
            }
            }
                   private void printEvent(XMLStreamReader xmlr) {
                     
                       switch (xmlr.getEventType()) {
       
                       case XMLStreamConstants.START_ELEMENT:
                          print(xmlr);
                           break;
      
                       case XMLStreamConstants.CHARACTERS:
                           int start = xmlr.getTextStart();
                           int length = xmlr.getTextLength();
                           System.out.print(new String(xmlr.getTextCharacters(),
                                      start,
                                      length));
                           break;
                       }
                   }
                   private  String print(XMLStreamReader xmlr){
                        if(xmlr.hasName()){
                          for (int i=0; i < xmlr.getAttributeCount(); i++) {
                              String localName = xmlr.getLocalName();
                              if (localName != null);
                              String attName = xmlr.getAttributeLocalName(i);
                                String value = xmlr.getAttributeValue(i);
                                System.out.print(",");
                                String outvalue = localName +":"+ attName +"-"+value;
                                System.out.print(outvalue);
                          }
                        } return outvalue;
                      }
                       
  }
        public static void main(String[] args) throws Exception
        {
                Configuration conf = new Configuration();
                conf.set("xmlinput.start", "<FICHER>");
                conf.set("xmlinput.end", "</FICHER>");
                Job job = new Job(conf);
                job.setJarByClass(XmlParser11.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                job.setMapperClass(XmlParser11.Map.class);
                job.setNumReduceTasks(0);
                job.setInputFormatClass(XmlInputFormat1.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
                job.waitForCompletion(true);
        }

这是腻子的输出

<>之前文件系统计数器FILE:读取的字节数=0 strong text>FILE: Number of bytes written=120678FILE:读取操作数=0FILE:大读操作数=0FILE:写操作数=0HDFS:读取的字节数=1762671HDFS:写入字节数=0HDFS:读取操作数=5HDFS:大读操作数=0HDFS:写操作数=2柜台工作启动的地图任务=1机架-局部地图任务=1所有地图占用槽位的总时间(ms)=15960所有人在占用的插槽中花费的总时间(ms)=0所有map任务花费的总时间(ms)=3990所有map任务占用的总vcore-秒=3990所有map任务占用的总兆秒数=16343040使用映射-规约模式框架映射输入记录=0映射输出记录=0输入分割字节=124把记录= 0没有打乱= 0合并地图输出=0GC所用时间(ms)=0CPU时间(ms)=1390物理内存(字节)快照=513449984虚拟内存(字节)快照=4122763264总已提交堆使用量(字节)=2058354688文件输入格式计数器字节读= 1762547文件输出格式计数器字节写= 0

问题在开始标签:

conf.set("xmlinput.start", "<FICHER");`
conf.set("xmlinput.end", "</FICHER>"); 

相关内容

  • 没有找到相关文章

最新更新