映射reduce以复制输出文件,以便放在hive中的不同表中



我在hive中有一个MASTER表和另外两个表

Master table contains
MsgId,NbOfTxs,InitgPty,PmtInf,DbtrAcct
Sub Master Table 1
MsgId,NbOfTxs,DbtrAcct
Sub Master Table 2
MsgId,NbOfTxs,InitgPty

数据是xml格式,我已经编写了MR代码来解析它。我想创建不同的part -r文件,以便它们直接将输出输出到hive中的表

如何使用MapReduce加载相应的hive表直接将OUTPUT文件放到hive中或者有更好的方法将这些文件放到hive表中

下面的代码

package xmlcsvMR;
import javax.xml.stream.XMLStreamConstants;//XMLInputFactory;
import java.io.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import javax.xml.stream.*;
public class XmlParser11
{
        public static class XmlInputFormat1 extends TextInputFormat {
        public static final String START_TAG_KEY = "xmlinput.start";
        public static final String END_TAG_KEY = "xmlinput.end";

        public RecordReader<LongWritable, Text> createRecordReader(
                InputSplit split, TaskAttemptContext context) {
            return new XmlRecordReader();
        }
        /**
         * XMLRecordReader class to read through a given xml document to output
         * xml blocks as records as specified by the start tag and end tag
         *
         */
        public static class XmlRecordReader extends
                RecordReader<LongWritable, Text> {
            private byte[] startTag;
            private byte[] endTag;
            private long start;
            private long end;
            private FSDataInputStream fsin;
            private DataOutputBuffer buffer = new DataOutputBuffer();
            private LongWritable key = new LongWritable();
            private Text value = new Text();
                @Override
            public void initialize(InputSplit split, TaskAttemptContext context)
                    throws IOException, InterruptedException {
                Configuration conf = context.getConfiguration();
                startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
                endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
                FileSplit fileSplit = (FileSplit) split;
                // open the file and seek to the start of the split
                start = fileSplit.getStart();
                end = start + fileSplit.getLength();
                Path file = fileSplit.getPath();
                FileSystem fs = file.getFileSystem(conf);
                fsin = fs.open(fileSplit.getPath());
                fsin.seek(start);
            }
        @Override
            public boolean nextKeyValue() throws IOException,
                    InterruptedException {
                if (fsin.getPos() < end) {
                    if (readUntilMatch(startTag, false)) {
                        try {
                            buffer.write(startTag);
                            if (readUntilMatch(endTag, true)) {
                                key.set(fsin.getPos());
                                value.set(buffer.getData(), 0,
                                        buffer.getLength());
                                return true;
                            }
                        } finally {
                            buffer.reset();
                        }
                    }
                }
                return false;
            }
        @Override
           public LongWritable getCurrentKey() throws IOException,
                    InterruptedException {
                return key;
            }
        @Override
            public Text getCurrentValue() throws IOException,
                    InterruptedException {
                return value;
            }
        @Override
            public void close() throws IOException {
                fsin.close();
            }
        @Override
            public float getProgress() throws IOException {
                return (fsin.getPos() - start) / (float) (end - start);
            }
            private boolean readUntilMatch(byte[] match, boolean withinBlock)
                    throws IOException {
                int i = 0;
                while (true) {
                    int b = fsin.read();
                    // end of file:
                    if (b == -1)
                        return false;
                    // save to buffer:
                    if (withinBlock)
                        buffer.write(b);
                    // check if we're matching:
                    if (b == match[i]) {
                        i++;
                        if (i >= match.length)
                            return true;
                    } else
                        i = 0;
                    // see if we've passed the stop point:
                    if (!withinBlock && i == 0 && fsin.getPos() >= end)
                        return false;
                }
            }
        }
    }

        public static class Map extends Mapper<LongWritable, Text,
    Text, Text> {
  @Override
  protected void map(LongWritable key, Text value,
                     Mapper.Context context)
      throws
      IOException, InterruptedException {
    String document = value.toString();
    System.out.println("‘" + document + "‘");
        try {
      XMLStreamReader reader =
          XMLInputFactory.newInstance().createXMLStreamReader(new
              ByteArrayInputStream(document.getBytes()));
      String propertyName = "";
      String propertyValue = "";
      String currentElement = "";
      while (reader.hasNext()) {
        int code = reader.next();
        switch (code) {
          case XMLStreamConstants.START_ELEMENT: //START_ELEMENT:
            currentElement = reader.getLocalName();
            break;
          case XMLStreamConstants.CHARACTERS:  //CHARACTERS:
            if (currentElement.equalsIgnoreCase("MsgId")) {
              propertyName += reader.getText();
              //System.out.println(propertyName);
            } else if (currentElement.equalsIgnoreCase("NbOfTxs")) {
              propertyValue += reader.getText();
              //System.out.println(propertyValue);
            }
            break;
        }
      }
      reader.close();
      context.write(new Text(propertyName.trim()), new Text(propertyValue.trim()));
    }
        catch(Exception e){
                throw new IOException(e);
                }
  }
}
public static class Reduce
    extends Reducer<Text, Text, Text, Text> {
private Text outputKey = new Text();
  public void reduce(Text key, Iterable<Text> values,
                     Context context)
      throws IOException, InterruptedException {
  for (Text value : values) {
     outputKey.set(constructPropertyXml(key, value));
      context.write(outputKey, null);
    }
  }
 public static String constructPropertyXml(Text name, Text value) {
    StringBuilder sb = new StringBuilder();
    sb.append("MsgID ").append(name)
        .append("  NbOfTxs ").append(value);
        return sb.toString();
  }

}

        public static void main(String[] args) throws Exception
        {
                Configuration conf = new Configuration();
                conf.set("xmlinput.start", "<Event>");
                conf.set("xmlinput.end", "</Event>");
                conf.set("mapred.textoutputformat.separatorText", ",");
                Job job = new Job(conf);
                job.setJarByClass(XmlParser11.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                job.setMapperClass(XmlParser11.Map.class);
                job.setReducerClass(XmlParser11.Reduce.class);
                job.setInputFormatClass(XmlInputFormat1.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
                job.waitForCompletion(true);
        }
}

尝试使用MultiOutputs。您可以使用此选项写入不同的文件,因此可以制作不同的输出副本以加载到Hive中。

一个很好的例子是在这里使用hadoop 1.0.2

下面是取自javadocs的示例:

Usage in Reducer:
 <K, V> String generateFileName(K k, V v) {
   return k.toString() + "_" + v.toString();
 }
 public class MOReduce extends
   Reducer<WritableComparable, Writable,WritableComparable, Writable> {
 private MultipleOutputs mos;
 public void setup(Context context) {
 ...
 mos = new MultipleOutputs(context);
 }
 public void reduce(WritableComparable key, Iterator<Writable> values,
 Context context)
 throws IOException {
 ...
 mos.write("text", , key, new Text("Hello"));
 mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
 mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
 mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
 ...
 }
 public void cleanup(Context) throws IOException {
 mos.close();
 ...
 }
 }

相关内容

  • 没有找到相关文章

最新更新