Hadoop MapReduce解析CSV时出现错误



在解析CSV文件时,我在map函数中得到以下错误。

14/07/15 19:40:05 INFO mapreduce.Job: Task Id : attempt_1403602091361_0018_m_000001_2, Status : FAILED
Error: java.lang.ArrayIndexOutOfBoundsException: 4
        at com.test.mapreduce.RetailCustomerAnalysis_2$MapClass.map(RetailCustomerAnalysis_2.java:55)
        at com.test.mapreduce.RetailCustomerAnalysis_2$MapClass.map(RetailCustomerAnalysis_2.java:1)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:429)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

下面给出了映射函数

package com.test.mapreduce;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class RetailCustomerAnalysis_2 extends Configured implements Tool {
     public static class MapClass extends MapReduceBase
     implements Mapper<Text, Text, Text, Text> {
          private Text key1 = new Text();
          private Text value1 = new Text();

     public void map(Text key, Text value,
                     OutputCollector<Text, Text> output,
                     Reporter reporter) throws IOException {

         String line = value.toString();
         String[] split = line.split(",");

         key1.set(split[0].trim()); 
         /* line no 55 where error is occuring */
         value1.set(split[4].trim()); 

         output.collect(key1, value1);
     }
 }

 public int run(String[] args) throws Exception {
     Configuration conf = getConf();
     JobConf job = new JobConf(conf, RetailCustomerAnalysis_2.class);
     Path in = new Path(args[0]);
     Path out = new Path(args[1]);
     FileInputFormat.setInputPaths(job, in);
     FileOutputFormat.setOutputPath(job, out);
     job.setJobName("RetailCustomerAnalysis_2");
     job.setMapperClass(MapClass.class);
     job.setReducerClass(Reduce.class);
     job.setInputFormat(KeyValueTextInputFormat.class);
     job.setOutputFormat(TextOutputFormat.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
    // job.set("key.value.separator.in.input.line", ",");
     JobClient.runJob(job);
     return 0;
 }
 public static void main(String[] args) throws Exception { 
     int res = ToolRunner.run(new Configuration(), new RetailCustomerAnalysis_2(), args);
     System.exit(res);
 }
}

用于运行此代码的示例输入如下

PRAVEEN,4002012,Kids,02GK,7/4/2010
PRAVEEN,400201,TOY,020383,14/04/2014

我正在使用以下命令和输入来运行应用程序。

yarn jar RetailCustomerAnalysis_2.jar com.test.mapreduce.RetailCustomerAnalysis_2 /hduser/input5 /hduser/output5

添加检查以查看输入行是否定义了所有字段或忽略处理它的映射函数。在新的API中,代码应该是这样的:

    if(split.length!=noOfFields){
                        return;
        }

此外,如果您进一步感兴趣,您可以设置hadoop计数器来了解总共有多少行,其中多少行包含csv文件中所有必需的字段。

    if(split.length!=noOfFields){
            context.getCounter(MTJOB.DISCARDED_ROWS_DUE_MISSING_FIELDS)
            .increment(1);
            return;
        }

split[]只包含元素split[0], split[1], split[2]和split[3]

KeyValueTextInputFormat的情况下,分隔符之前的第一个字符串被认为是键,其余的行被认为是值。字节分隔符(逗号或空格等)用于分隔每个记录中的键和值。

在您的代码中,第一个逗号之前的第一个字符串作为键,其余的行作为值。当你拆分这个值时,里面只有4个字符串。因此,String数组只能从split[0]变为split[3],而不是split[4]。

欢迎任何建议或更正

相关内容

  • 没有找到相关文章

最新更新