Mapreduce与HCATALOG集成,在MAPR中与oozie集成



我编写了一个mapreduce程序,使用HCATLOG从hive表中读取数据并写入HBase。这是一个只有地图的工作,没有减少。我已经从命令行运行了程序,它按预期工作(创建了一个胖罐,以避免jar问题)。我想整合它oozie(在HUE的帮助下)。我有两个选项来运行它

  1. 使用Mapreduce Action
  2. 使用Java Action

因为我的Mapreduce程序有一个驱动程序方法来保存下面的代码

import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
public class HBaseValdiateInsertDriver {
public static void main(String[] args) throws Exception {
    String dbName = "Test";
    String tableName = "emp";
    Configuration conf = new Configuration();
    args = new GenericOptionsParser(conf, args).getRemainingArgs();
    Job job = new Job(conf, "HBase Get Put Demo");
    job.setInputFormatClass(HCatInputFormat.class);
    HCatInputFormat.setInput(job, dbName, tableName, null);
    job.setJarByClass(HBaseValdiateInsertDriver.class);
    job.setMapperClass(HBaseValdiateInsert.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setNumReduceTasks(0);
    FileInputFormat.addInputPath(job, new Path("maprfs:///user/input"));
    FileOutputFormat.setOutputPath(job, new Path("maprfs:///user/output"));
    job.waitForCompletion(true);
    }
}

如何在oozie中指定驱动方法,我所能看到的就是指定映射器和reducer类。有人能指导我如何设置属性吗?

使用java操作,我可以指定我的驱动程序类为主类,并得到这个执行,但我面临的错误,如表未找到,HCATLOG jar未找到等。我在工作流程中包括hive-site.xml(使用Hue),但我觉得系统无法拾取属性。有人能告诉我什么都我要照顾,有任何其他配置属性,我需要包括吗?

我在cloudera网站上引用的示例程序也使用了

HCatInputFormat.setInput(job, InputJobInfo.create(dbName,
                inputTableName, null));

当我使用下面的(我没有看到一个方法接受上面的输入

HCatInputFormat.setInput(job, dbName, tableName, null);

下面是我的mapper代码

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hive.hcatalog.data.HCatRecord;

public class HBaseValdiateInsert extends Mapper<WritableComparable, HCatRecord, Text, Text> {
    static HTableInterface table;
    static HTableInterface inserted;
    private String hbaseDate = null;
    String existigValue=null;
    List<Put> putList = new ArrayList<Put>();

    @Override
    public void setup(Context context) throws IOException {
                Configuration conf = context.getConfiguration();
                String tablename = "dev_arch186";
        Utils.getHBConnection();
        table = Utils.getTable(tablename);
        table.setAutoFlushTo(false);
    }
    @Override
    public void cleanup(Context context) {
        try {
            table.put(putList);
            table.flushCommits();
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        Utils.closeConnection();
    }

    @Override
    public void map(WritableComparable key, HCatRecord value, Context context) throws IOException, InterruptedException {
                String name_hive = (String) value.get(0);
                String id_hive = (String) value.get(1);
                String rec[] = test.toString().split(",");
        Get g = new Get(Bytes.toBytes(name_hive));
        existigValue=getOneRecord(Bytes.toBytes("Info"),Bytes.toBytes("name"),name_hive);
        if (existigValue.equalsIgnoreCase("NA") || !existigValue.equalsIgnoreCase(id_hive)) {
            Put put = new Put(Bytes.toBytes(rec[0]));
            put.add(Bytes.toBytes("Info"),
                    Bytes.toBytes("name"),
                    Bytes.toBytes(rec[1]));
            put.setDurability(Durability.SKIP_WAL);
            putList.add(put);
            if(putList.size()>25000){
                table.put(putList);
                table.flushCommits();
            }
        }
    }

    public String getOneRecord(byte[] columnFamily, byte[] columnQualifier, String rowKey)
            throws IOException {
        Get get = new Get(rowKey.getBytes());
        get.setMaxVersions(1);
        Result rs = table.get(get);
        rs.getColumn(columnFamily, columnQualifier);
        System.out.println(rs.containsColumn(columnFamily, columnQualifier));
        KeyValue result = rs.getColumnLatest(columnFamily,columnQualifier);
        if (rs.containsColumn(columnFamily, columnQualifier))
            return (Bytes.toString(result.getValue()));
        else
            return "NA";
    }
    public boolean columnQualifierExists(String tableName, String ColumnFamily,
            String ColumnQualifier, String rowKey) throws IOException  {
        Get get = new Get(rowKey.getBytes());
        Result rs = table.get(get);
        return(rs.containsColumn(ColumnFamily.getBytes(),ColumnQualifier.getBytes()));
    }
}

注意:我使用MapR (M3) Cluster和HUE作为oozie的接口。Hive Version: 1-0HCAT版本:1-0

我找不到任何方法从Oozie mapreduce动作初始化HCatInputFormat。但我有一个变通方法如下:

通过扩展HCatInputFormat创建LazyHCatInputFormat。覆盖getJobInfo方法来处理初始化。这将作为getsplit(..)调用的一部分调用。

    private static void lazyInit(Configuration conf){
    try{
        if(conf==null){
            conf = new Configuration(false);
        }
        conf.addResource(new Path(System.getProperty("oozie.action.conf.xml")));
        conf.addResource(new org.apache.hadoop.fs.Path("hive-config.xml"));
        String databaseName = conf.get("LazyHCatInputFormat.databaseName");
        String tableName = conf.get("LazyHCatInputFormat.tableName");
        String partitionFilter = conf.get("LazyHCatInputFormat.partitionFilter");
        setInput(conf, databaseName, tableName);
        //setFilter(partitionFilter);
        //System.out.println("After lazyinit : "+conf.get("mapreduce.lib.hcat.job.info"));
    }catch(Exception e){
        System.out.println("*** LAZY INIT FAILED ***");
        //e.printStackTrace();
    }
}
public static InputJobInfo getJobInfo(Configuration conf)
        throws IOException {
    String jobString = conf.get("mapreduce.lib.hcat.job.info");
    if (jobString == null) {
        lazyInit(conf);
        jobString = conf.get("mapreduce.lib.hcat.job.info");
        if(jobString == null){
            throw new IOException("job information not found in JobContext. HCatInputFormat.setInput() not called?");   
        }
    }
    return (InputJobInfo) HCatUtil.deserialize(jobString);
}

在oozie map- reduce操作中,配置如下。

              <property>
                <name>mapreduce.job.inputformat.class</name>
                <value>com.xyz.LazyHCatInputFormat</value>
          </property>
          <property>
                <name>LazyHCatInputFormat.databaseName</name>
                <value>HCAT DatabaseNameHere</value>
          </property>
           <property>
                <name>LazyHCatInputFormat.tableName</name>
                <value>HCAT TableNameHere</value>
          </property>

这可能不是最好的实现,但一个快速的hack使它工作

最新更新