单节点集群在多节点集群上工作



我已经为单机集群编写了mapreduce java代码而不使用工具,它将在多节点集群上工作还是我必须进行更改?以下代码对字符串进行标记并计算每个文本文件

的词频率
public class tr 
    {
    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,Text,IntWritable>
    {
       Text word=new Text();
       IntWritable one=new IntWritable(1);
               String imptoken;
       public static  List<String> stopwords=new ArrayList<String>();
       public void map(LongWritable key,Text value,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException
       {
                       addwords();
              String line=value.toString();
                      line=line.replaceAll("[^A-Za-z]"," ").toLowerCase();
                      StringTokenizer st=new StringTokenizer(line);
          while(st.hasMoreTokens())
        {
                 imptoken=st.nextToken();
                             if(stopwords.contains(imptoken))
               {
               }               
               else
               {
                   word.set(imptoken);
                   output.collect(word, one); 
               }                              
                   }
    }
          public void addwords() throws IOException
      {
     FileSystem fs = FileSystem.get(new Configuration());
     Path stop=new Path("/user/hduser/stopword.txt");
     BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(stop)));
     String stopword=br.readLine();
     while(stopword!=null)
     {
         stopwords.add(stopword);
         stopword=br.readLine();
     }
      }
}
public static class Reduce extends MapReduceBase implements Reducer<Text,IntWritable, Text, IntWritable>
{
    public void reduce(Text key,Iterator<IntWritable> value,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException
    {
        int sum=0;
        while(value.hasNext())
        {
            sum=sum+value.next().get();
        }   
                   /* Path paths=new Path("/user/hduser/input1/");
        FileSystem fs=FileSystem.get(new Configuration());
        FileStatus[] status = fs.listStatus(paths);
                    Path[] list = FileUtil.stat2Paths(status);
                    String keystr=key.toString();
                    for(Path file : list)
                    { 
                       BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(file)));
               String word=br.readLine();
               while(word!=null)
               {
                 if(word.equals(keystr))
                 {
                     sum=0;
                 }
                 word=br.readLine();
                }
                     }*/
                     output.collect(key, new IntWritable(sum));
            }       
    }
public static void main(String args[]) throws IOException
{             
    FileSystem fs = FileSystem.get(new Configuration());
    Path[] paths = new Path[args.length];
    for (int i = 0; i < paths.length; i++) 
    {
        paths[i] = new Path(args[i]);
    }
    FileStatus[] status = fs.listStatus(paths);
            Path[] listedPaths = FileUtil.stat2Paths(status);
    FSDataInputStream in = null;
    for (Path p : listedPaths) 
    {
         JobConf conf = new JobConf(tr.class);
             conf.setJobName("tr");
             conf.setOutputKeyClass(Text.class);
             conf.setOutputValueClass(IntWritable.class);
             conf.setMapperClass(Map.class);
             conf.setCombinerClass(Reduce.class);
             conf.setReducerClass(Reduce.class);
             conf.setInputFormat(TextInputFormat.class);
                     conf.setOutputFormat(TextOutputFormat.class);
             String name=p.getName();
             String absolutepath=p.getParent().toString()+"/"+name;
             FileInputFormat.setInputPaths(conf, new Path(absolutepath));
                     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
                     JobClient.runJob(conf); 
                     Path local=new Path("/home/hduser/meproj/projectfiles/");
                     Path source=new Path(args[1]+"/"+"part-00000");
                     fs.copyToLocalFile(source, local);
                     File  file=new File("/home/hduser/meproj/projectfiles/part-00000");
                     file.renameTo(new File("/home/hduser/meproj/projectfiles/"+name));
                     fs.delete(new Path(args[1]), true);
    }
}

}

当你为Hadoop编写程序时,它将适用于所有集群设置,除非你特别做了一些事情来破坏它,比如在一台机器上处理本地文件。

你在Mapper和Reducer中以独立于设置的方式做工作(这是你应该做的),所以它应该在任何地方工作。

这与您的问题无关,但您不应该在文件上循环并在每个路径上运行独立的作业。真的,你应该在所有这些上运行一个Job。您可以将所有这些单独的路径放在同一个文件夹中,并指定该文件夹作为输入。或者您可以在多个路径上运行hadoop(参见此答案)

相关内容

  • 没有找到相关文章

最新更新