hadoop CustomInputFormat未被调用



我已经编写了一个自定义输入格式,并在作业中进行了配置。仍然没有调用inputformat。我保留了一些SOP以便在运行代码时打印,但没有一个正在打印。即使我在驱动程序类中注释自定义输入格式,输出仍然保持不变。我在哪里失踪了?

驾驶员类

public class TestDriver {
    public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException{
        Configuration conf = new Configuration();
        Job job = new Job(conf,"Custom Format");
        job.setMapperClass(CustomInputFormatmapper.class);
        job.setReducerClass(CustomInputFormatReducer.class);
        job.setInputFormatClass(CustomInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.getConfiguration().set("fs.file.impl", "com.learn.WinLocalFileSystem");
        String inputPath="In\VISA_Details.csv";
        Path inPath=new Path(inputPath);
        String outputPath = "C:\Users\Desktop\Hadoop learning\output\run1";
        Path outPath=new Path(outputPath);
        FileInputFormat.setInputPaths(job, inPath );
        FileOutputFormat.setOutputPath(job, outPath);
        System.out.println(job.waitForCompletion(true));

    }
}

自定义输入格式

    import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class CustomInputFormat extends TextInputFormat{
    public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)
    {
        System.out.println(" ------------ INSIDE createRecordReader()--------------");
        return new CustomRecordReader();
    }
}

自定义记录读取器

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
public class CustomRecordReader extends RecordReader {
    private CompressionCodecFactory compressionCodecs;
    private final int NLINESTOPROCESS = 3;
    private long start;
    private long pos;
    private long end;
    private LineReader in;
    private int maxLineLength;
    private LongWritable key;
    private Text value;
    @Override
    public void close() throws IOException {
        // TODO Auto-generated method stub
    }
    @Override
    public Object getCurrentKey() throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return null;
    }
    @Override
    public Object getCurrentValue() throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return null;
    }
    @Override
    public float getProgress() throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return 0;
    }
    @Override
    public void initialize(InputSplit inputsplit,TaskAttemptContext taskattemptcontext) 
            throws IOException, InterruptedException {
        System.out.println(" ---------- INSIDE INITILISE:  THIS IS NOT PRINTING----------");
        FileSplit split = (FileSplit)inputsplit;
        Configuration job = taskattemptcontext.getConfiguration();
        maxLineLength = job.getInt("mapred.linerecordreader.maxlength", 2147483647);
        start = split.getStart();
        end = start + split.getLength();
        Path file = split.getPath();
        compressionCodecs = new CompressionCodecFactory(job);
        CompressionCodec codec = compressionCodecs.getCodec(file);
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());
        boolean skipFirstLine = false;
        if(codec != null)
        {
            in = new LineReader(codec.createInputStream(fileIn), job);
            end = 9223372036854775807L;
        } else
        {
            if(start != 0L)
            {
                skipFirstLine = true;
                start--;
                fileIn.seek(start);
            }
            in = new LineReader(fileIn, job);
        }
        if(skipFirstLine)
            start += in.readLine(new Text(), 0, (int)Math.min(2147483647L, end - start));
        pos = start;
    }
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        System.out.println(" ---------- INSIDE nextKeyValue()------------");
        if(key==null){
            key = new LongWritable();
        }
        if(value==null){
            value = new Text();
        }
        key.set(pos);
        value.clear();
        final Text newLine = new Text("n");
        Text newVal = new Text();
         int newSize = 0;
        for(int i =0;i<NLINESTOPROCESS;i++){
             Text v = new Text();
             while(pos<end){
                 newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
                 value.append(v.getBytes(),0, v.getLength());
                 value.append(newLine.getBytes(),0, newLine.getLength());
                 if (newSize == 0) {
                        break;
                    }
                    pos += newSize;
                    if (newSize < maxLineLength) {
                        break;
                    }
             }
        }

        return false;
    }
}

映射器类

    import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class CustomInputFormatmapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
    public void map(LongWritable key, Text val, Context context)throws IOException, InterruptedException{
        String value = val.toString();
        String[] totalRows = value.split("n");
        int count =totalRows.length;
        context.write(new LongWritable(Long.valueOf(count)), new LongWritable(1L));
    }
}

减速器等级

    import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class CustomInputFormatReducer extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {
    public void reduce(LongWritable key, Iterable<LongWritable> val, Context context) throws IOException, InterruptedException{
        System.out.println(" --------REDUCER--------");
        long count =0;
        for(LongWritable vals: val){
            count++;
        }
        context.write(key, new LongWritable(count));
    }
}

我在回答自己的问题,因为这将帮助其他人解决我面临的问题。我正在导入的程序包出现问题。提到我犯的错误。

自定义输出格式类

1) 错过了@Override注释2) 从导入org.apache.hadoop.mapred.InputSplit而不是org.apache.haoop.mareduce.InputSplit导入;

自定义记录读取器

1) 导入是从org.apache.hadop.mapred.*完成的,而不是从org.apache.Hadop.mapReduced.*;

相关内容

  • 没有找到相关文章

最新更新