Hadoop-streaming:将输出写入不同的文件



场景如下

           Reducer1  
         /  
Mapper - - Reducer2  
            
           ReducerN  

在reducer中,我想把数据写在不同的文件中,比如reducer看起来像

def reduce():  
  for line in sys.STDIN:  
    if(line == type1):
      create_type_1_file(line)
    if(line == type2):
      create_type_2_file(line)
    if(line == type3):
      create_type3_file(line)
      ... and so on  
def create_type_1_file(line):
  # writes to file1  
def create_type2_file(line):
  # writes to file2  
def create_type_3_file(line):
  # write to file 3  

考虑写入路径:

file1 = /home/user/data/file1  
file2 = /home/user/data/file2  
file3 = /home/user/data/file3  

当我在pseudo-distributed mode(machine with one node and hdfs daemons running)中运行时,情况很好,因为所有守护进程都将写入同一组文件

问题:-如果我在1000台机器的集群中运行,它们会写入相同的文件集吗?在这种情况下,我是writing to local filesystem,在hadoop streaming中是否有更好的方法来执行此操作?

通常,reduce的o/p会被写入像HDFS这样可靠的存储系统,因为如果其中一个节点宕机,那么与该节点相关的reduce数据就会丢失。在Hadoop框架的上下文中,不可能再次运行特定的reduce任务。此外,一旦作业完成,必须为不同的输入类型整合来自1000个节点的o/p。

HDFS不支持并发写。可能有这样一种情况,多个reducer可能在HDFS中写入同一个文件,这可能会损坏文件。当多个reduce任务在单个节点上运行时,当写入单个本地文件时,并发性也可能是一个问题。

解决方案之一是使用reduce任务特定的文件名,然后将特定输入类型的所有文件组合在一起。

可以使用MultipleOutputs类将输出从Reducer写入多个位置。您可以将file1,file2和file3作为三个文件夹,分别将1000个reducer的输出数据写入这三个文件夹。


作业提交的使用模式:

 Job job = new Job();
 FileInputFormat.setInputPath(job, inDir);
//outDir is the root path, in this case, outDir="/home/user/data/"
 FileOutputFormat.setOutputPath(job, outDir);
//You have to assign the output formatclass.Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000. To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); instead of job.setOutputFormatClass(TextOutputFormat.class); in your Hadoop job configuration.
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); 
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
 job.setMapperClass(MOMap.class);
 job.setReducerClass(MOReduce.class);
 ...
 job.waitForCompletion(true);

在Reducer中的使用:

private MultipleOutputs out;
 public void setup(Context context) {
   out = new MultipleOutputs(context);
   ...
 }
 public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
//'/' characters in baseOutputPath will be translated into directory levels in your file system. Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. No call to context.write() is necessary.
 for (Text line : values) {
    if(line == type1)
      out.write(key, new Text(line),"file1/part");
  else  if(line == type2)
      out.write(key, new Text(line),"file2/part");
 else   if(line == type3)
      out.write(key, new Text(line),"file3/part");
   }
 }
 protected void cleanup(Context context) throws IOException, InterruptedException {
       out.close();
   }

裁判:https://hadoop.apache.org/docs/r2.6.3/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html

相关内容

  • 没有找到相关文章

最新更新