hadoop作业链接错误:java.io.io异常:错误的值类:



我有两个映射器类和两个归约器类。我希望流程是这样的。mapperOne-->ReducerOne-->MapperTwo-->ReducerTwo。

这是我的驾驶员等级代码。

public class StockDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        System.out.println(" Driver invoked------");
        Configuration config = new Configuration();
        config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
        config.set("mapred.textoutputformat.separator", " --> ");
        String inputPath="In\NYSE_daily_prices_Q_less.csv";
        String outpath = "C:\Users\Desktop\Hadoop\run1";
        String outpath2 = "C:\Users\Desktop\Hadoop\run2";
        Job job1 = new Job(config,"Stock Analysis: Creating key values");
        job1.setInputFormatClass(TextInputFormat.class);
        job1.setOutputFormatClass(SequenceFileOutputFormat.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(StockDetailsTuple.class);
        job1.setMapperClass(StockMapperOne.class);
        job1.setReducerClass(StockReducerOne.class);
        FileInputFormat.setInputPaths(job1, new Path(inputPath));
        SequenceFileOutputFormat.setOutputPath(job1, new Path(outpath));
        //FileOutputFormat.setOutputPath(job1, new Path(outpath));
        //THE SECOND MAP_REDUCE TO DO CALCULATIONS
        Job job2 = new Job(config,"Stock Analysis: Calculating Covariance");
        job2.setInputFormatClass(SequenceFileInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);
        job2.setMapperClass(StockMapperTwo.class);
        job2.setReducerClass(StockReducerTwo.class);
        SequenceFileInputFormat.setInputPaths(job2, new Path(outpath));
        FileOutputFormat.setOutputPath(job2, new Path(outpath2));

        System.out.println(job1.waitForCompletion(true));
        System.out.println(job2.waitForCompletion(true));
    }
}

我的MapperOne类

public class StockMapperOne extends Mapper<LongWritable, Text, Text, StockDetailsTuple> {
    private StockDetailsTuple stock= new StockDetailsTuple();
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
        String valueString = value.toString();
        String[] tokenArray = valueString.split(",");
        String dateOfStock= tokenArray[2];
        int month = Integer.parseInt(dateOfStock.substring(5, 7));
        int year = Integer.parseInt(dateOfStock.substring(0,4));
        stock.setStockDate(dateOfStock); 
        stock.setStockName(tokenArray[1]);
        stock.setStockPrice(Float.parseFloat(tokenArray[4]));
        stock.setMonthNum(month);
        stock.setYear(year);
        System.out.println(" Date of stock: "+dateOfStock + "  stock:   "+stock);
        context.write(new Text(dateOfStock.trim()), stock);
    }
}

REDUCER ONE CLASS(我正在将键、值对设置为TEXT、TEXT)

public class StockReducerOne extends Reducer<Text, StockDetailsTuple, Text, Text> {
    public void reduce(Text key, Iterable<StockDetailsTuple> values, Context context) throws IOException, InterruptedException{
            for(StockDetailsTuple val: values){
                System.out.println("VAL : "+ val);
                priceMap.put(val.getStockName(),val.getStockPrice());
                stockGroups = stockGroups.append(val.getStockName()).append(":"); 
                month=val.getMonthNum();
                count++;
            }
            if(count>1){
                stockGroupAfterPermutations =doPermutation(stockGroups.toString());
                formNewKey(context,this.stockGroupAfterPermutations, priceMap, month);
            }
    }

    private void formNewKey(Context context,List<String> stockGroup, Map<String, Float> priceMap2,int month2) 
            throws IOException, InterruptedException {
        System.out.println(" FORMING NEW KEY----------");
        String tempKey=null,tempVal = null ;
        for(String stkgrp:stockGroup){
            String[] splitTokens = stkgrp.split(",");
            //The below line gives the key as 3 QRR,QTM 
            // value as 12.22 13.33
            Arrays.sort(splitTokens);
            //System.out.println(" SORTED ARRAY:   -->  "+ splitTokens);
            tempKey=String.valueOf(month2)+","+splitTokens[0]+","+splitTokens[1];
            tempVal=splitTokens[0]+","+String.valueOf(priceMap2.get(splitTokens[0]))+","+splitTokens[1]+","+String.valueOf(priceMap2.get(splitTokens[1]));
            //Finally our key value pair looks like
            //7,QTM,QXM --> QTM,3.22,QXM,9.61
        }
        System.out.println(" NEW KEY: "+tempKey +"  NEW VAL: "+ tempVal);
        context.write(new Text(tempKey), new Text(tempVal));
    }

}

MY WRITABLE StockDetailsTuple Class
-------------------------------------
public class StockDetailsTuple implements Writable{
    private String stockName;
    private int monthNum;
    private int year;
    private float stockPrice;
    private String stockDate;
    @Override
    public void readFields(DataInput in) throws IOException {
        this.monthNum=in.readInt();
        this.year=in.readInt();
        this.stockPrice=in.readFloat();
        this.stockName=in.readUTF();
        this.stockDate=in.readUTF();
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.monthNum);
        out.writeInt(this.year);
        out.writeFloat(this.stockPrice);
        out.writeUTF(this.stockName);
        out.writeUTF(this.stockDate);
    }
    public String getStockName() {
        return stockName;
    }
    public void setStockName(String stockName) {
        this.stockName = stockName;
    }
    public int getMonthNum() {
        return monthNum;
    }
    public void setMonthNum(int monthNum) {
        this.monthNum = monthNum;
    }
    public int getYear() {
        return year;
    }
    public void setYear(int year) {
        this.year = year;
    }
    public float getStockPrice() {
        return stockPrice;
    }
    public void setStockPrice(float stockPrice) {
        this.stockPrice = stockPrice;
    }
    public String getStockDate() {
        return stockDate;
    }
    public void setStockDate(String stockDate) {
        this.stockDate = stockDate;
    }
    @Override
    public String toString() {
        return "StockDetailsTuple [stockName=" + stockName + ", monthNum="
                + monthNum + ", year=" + year + ", stockPrice=" + stockPrice
                + ", stockDate=" + stockDate + "]";
    }

}

当我运行此代码时,我在ReducerOne上得到一个异常。13/07/15 23:11:49 WARN mapred.LocalJobRunner: job_local_0001 java.io.IOException: wrong value class: org.apache.hadoop.io.Text is not class com.assignment.StockDetailsTuple at org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1050) at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat$1.write(SequenceFileOutputFormat.java:74) at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:588) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)

如果我将ReducerOne中的值更改为StockDetailsTuple,则会运行此代码。我在这里错过了什么?我希望减缩器中的键,值为Text,Text。请帮忙。

setOutputKeyClasssetOutputValueClass定义了reducer发出的对象类型,但根据reducer的源代码,这些对象应该是TextText,而不是TextStockDetailsTuple

使用setMapOutputKeyClasssetMapOutputValueClass可以指定映射器的输出类型(如果它们与reducer不同)。

PS:您的StockReducerOne类在其reduce方法(priceMapcount…)中修改实例变量,这可能是您不想做的。至少您应该在每次reducer调用时将这些变量重置为初始值,否则可能会得到意外结果。

最新更新