我是Hive和MapReduce的新手,非常感谢您的回答,并提供正确的方法。
我已经定义了一个外部表logs
在hive分区上的日期和起源服务器与一个外部位置在hdfs /data/logs/
。我有一个MapReduce作业,它获取这些日志文件并将它们拆分并存储在上面提到的文件夹下。像
"/data/logs/dt=2012-10-01/server01/"
"/data/logs/dt=2012-10-01/server02/"
...
...
从MapReduce作业中,我想在Hive的表日志中添加分区。我知道这两种方法
- alter table命令——修改表命令太多
- 添加动态分区
对于方法二,我只看到INSERT OVERWRITE
的例子,这对我来说不是一个选择。是否有办法在作业结束后将这些新分区添加到表中?
要在Map/Reduce作业中做到这一点,我建议使用Apache HCatalog,这是Hadoop下的一个新项目。
HCatalog实际上是HDFS之上的一个抽象层,因此您可以以标准化的方式编写输出,无论是来自Hive, Pig还是M/R。您可以使用输出格式HCatOutputFormat
直接从Map/Reduce作业中加载Hive中的数据。以下是摘自官方网站的一个例子:
为(A =1,b=1)写一个特定分区的当前代码示例是这样的:
Map<String, String> partitionValues = new HashMap<String, String>();
partitionValues.put("a", "1");
partitionValues.put("b", "1");
HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);
HCatOutputFormat.setOutput(job, info);
如果要写入多个分区,则必须使用上述每个分区启动单独的作业。
您还可以在HCatalog中使用动态分区,在这种情况下,您可以在同一个作业中加载尽可能多的分区!
我建议你在上面提供的网站上进一步阅读HCatalog,如果需要的话,它会给你更多的细节。
实际上,事情比这更复杂一些,不幸的是,它在官方来源中没有文档记录(截至目前),并且需要几天的挫折才能弄清楚。
我发现我需要做以下的事情来让HCatalog Mapreduce作业能够写动态分区:
在我的工作(通常是reducer)的记录写入阶段,我必须手动将我的动态分区(HCatFieldSchema)添加到我的HCatSchema对象。
问题是HCatOutputFormat.getTableSchema(config)实际上并没有返回分区字段。它们需要手动添加
HCatFieldSchema hfs1 = new HCatFieldSchema("date", Type.STRING, null);
HCatFieldSchema hfs2 = new HCatFieldSchema("some_partition", Type.STRING, null);
schema.append(hfs1);
schema.append(hfs2);
下面是使用HCatalog在一个作业中使用动态分区写入多个表的代码,该代码已在Hadoop 2.5.0, Hive 0.13.1上进行了测试:
// ... Job setup, InputFormatClass, etc ...
String dbName = null;
String[] tables = {"table0", "table1"};
job.setOutputFormatClass(MultiOutputFormat.class);
MultiOutputFormat.JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
List<String> partitions = new ArrayList<String>();
partitions.add(0, "partition0");
partitions.add(1, "partition1");
HCatFieldSchema partition0 = new HCatFieldSchema("partition0", TypeInfoFactory.stringTypeInfo, null);
HCatFieldSchema partition1 = new HCatFieldSchema("partition1", TypeInfoFactory.stringTypeInfo, null);
for (String table : tables) {
configurer.addOutputFormat(table, HCatOutputFormat.class, BytesWritable.class, CatRecord.class);
OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, table, null);
outputJobInfo.setDynamicPartitioningKeys(partitions);
HCatOutputFormat.setOutput(
configurer.getJob(table), outputJobInfo
);
HCatSchema schema = HCatOutputFormat.getTableSchema(configurer.getJob(table).getConfiguration());
schema.append(partition0);
schema.append(partition1);
HCatOutputFormat.setSchema(
configurer.getJob(table),
schema
);
}
configurer.configure();
return job.waitForCompletion(true) ? 0 : 1;
映射器:
public static class MyMapper extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
HCatRecord record = new DefaultHCatRecord(3); // Including partitions
record.set(0, value.toString());
// partitions must be set after non-partition fields
record.set(1, "0"); // partition0=0
record.set(2, "1"); // partition1=1
MultiOutputFormat.write("table0", null, record, context);
MultiOutputFormat.write("table1", null, record, context);
}
}