拆分文件 w.r.t 输入文件 MapReduce.



有人可以建议我以下代码中有什么问题吗?

你能帮我如何使用这个Mapreduce程序获得下面的输出吗?实际上这段代码工作正常,但输出与预期不符......输出在两个文件中生成,但在名称.txt文件或年龄.txt文件中,输出正在交换

输入文件:

Name:A
Age:28
Name:B
Age:25
Name:K
Age:20
Name:P
Age:18
Name:Ak
Age:11
Name:N
Age:14
Name:Kr
Age:26
Name:Ra
Age:27

我的输出应该分为姓名和年龄

名称文件:

Name:A
Name:B
Name:K
Name:P
Name:Ak
Name:N
Name:Kr
Name:Ra

年龄文件:

Age:28
Age:25
Age:20
Age:18
Age:11
Age:14
Age:26
Age:27

我的代码 :

我的映射器.java

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text,     Text, Text> {
            public void map(LongWritable key, Text value,OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
                    String [] dall=value.toString().split(":");
                        output.collect(new Text(dall[0]),new Text(dall[1]));
            }
    }

MyReducer.Java:

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

    public class MyReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
            public void reduce(Text key, Iterator<Text> values,OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
                    while (values.hasNext()) {
                            output.collect(new Text(key),new Text(values.next()));
                    }
            }
    }

多文件输出.java:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.*;
    public class MultiFileOutput extends MultipleTextOutputFormat<Text, Text>{
        protected String generateFileNameForKeyValue(Text key, Text value,String name) {
                   //return new Path(key.toString(), name).toString();
                    return key.toString();
            }
            protected Text generateActualKey(Text key, Text value) {
                             //return new Text(key.toString());
                                return null;
                  }
    }

我的驱动程序.java:

import java.io.IOException;
import java.lang.Exception;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
public class MyDriver{
        public static void main(String[] args) throws Exception,IOException {

            Configuration mycon=new Configuration();
            JobConf conf = new JobConf(mycon,MyDriver.class);
        //JobConf conf = new JobConf(MyDriver.class);
            conf.setJobName("Splitting");
            conf.setMapperClass(MyMapper.class);
            conf.setReducerClass(MyReducer.class);

            conf.setInputFormat(TextInputFormat.class);
            conf.setOutputFormat(MultiFileOutput.class);
            conf.setOutputKeyClass(Text.class);
            conf.setMapOutputKeyClass(Text.class);
            //conf.setOutputValueClass(Text.class);
            conf.setMapOutputValueClass(Text.class);


            FileInputFormat.setInputPaths(conf,new Path(args[0]));
            FileOutputFormat.setOutputPath(conf,new Path(args[1]));
            JobClient.runJob(conf);
            //System.err.println(JobClient.runJob(conf));
    }
}

谢谢

好的,

这是比简单的字数统计更复杂的用例:)

所以你需要的是一个复杂的键和一个分区程序。并设置减速器数量=2

您的复杂键可以是文本(名称的串联|A 或 Age|28) 或 CustomWritable(有 2 个实例变量保存类型(名称或年龄)和值)

在映射器中,您创建文本或自定义可写 anfd 将其设置为输出键,值可以只是人的姓名或他的年龄。

创建一个分区程序(实现org.apache.hadoop.mapred.Partitioner)。在getPartition方法中,您基本上根据键决定它将转到哪个化简器。

希望这有帮助。

相关内容

  • 没有找到相关文章

最新更新