有人能帮我一下吗?我需要得到文本作为减速器类的输出。但是当我尝试时,它显示了一些错误
错误:java.lang.NumberFormatException: For input string: "open"java.lang.NumberFormatException.forInputString (NumberFormatException.java: 65)java.lang.Integer.parseInt (Integer.java: 492)java.lang.Integer.parseInt (Integer.java: 527)com.sciera.mapreduce.OpenClose TextSumReducer.reduce美元(OpenClose.java: 85)在com.sciera.mapreduce.OpenClose TextSumReducer.reduce美元(OpenClose.java: 1)org.apache.hadoop.mapreduce.Reducer.run (Reducer.java: 171)org.apache.hadoop.mapred.ReduceTask.runNewReducer (ReduceTask.java: 627)org.apache.hadoop.mapred.ReduceTask.run (ReduceTask.java: 389)在org.apache.hadoop.mapred.YarnChild 2.美元运行(YarnChild.java: 163)在java.security.AccessController。doPrivileged(本地方法)javax.security.auth.Subject.doAs (Subject.java: 415)org.apache.hadoop.security.UserGroupInformation.doAs (UserGroupInformation.java:> 1671)org.apache.hadoop.mapred.YarnChild.main (YarnChild.java: 158)
public class ClassName {
public static class TokenizerMapper extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] str = value.toString().split(",");
String rwid=str[10];
context.write(new Text(rwid), new Text(value));
}
}
public static class TextSumReducer extends Reducer<Text,Text,Text,Text> {
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int open = 0;
String value = "";
String pDate = "0000-00-00";
for (Text val : values) {
String[] valSeg = val.toString().split(",");
int id = Integer.parseInt(valSeg[0]);
String date = valSeg[4];
String fs = valSeg[6];
String fDate = valSeg[7];
String fStatus = valSeg[8];
int lId = Integer.parseInt(valSeg[9]);
if(date.equalsIgnoreCase("null")){
date = "0000-00-00";
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date1,date2;
try {
date1 = sdf.parse(date);
date2 = sdf.parse(pDate);
long diff = date1.getTime() - date2.getTime();
long gapDays = TimeUnit.DAYS.convert(diff, TimeUnit.MILLISECONDS);
String origStatus = "";
if(pDate!=fDate && (fStatus != "" && fStatus!="null") && lId>0 && (fDate!= "null" && fDate!="0000-00-00")){
if(fStatus.equalsIgnoreCase("a") || fStatus.equalsIgnoreCase("aa") || fStatus.equalsIgnoreCase("aaa")){
origStatus = "a";
open += 1;
}else if(!fStatus.equalsIgnoreCase("ccc")){
origStatus = "b";
}else{
origStatus = "";
}
pDate = fDate;
value = origStatus;
result.set(new Text(value));
context.write(key, result);
}
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception {
Path p = new Path(args[1]);
FileSystem date = FileSystem.get(new Configuration());
date.exists(p);
date.delete(p, true);
Configuration conf = new Configuration();
conf.set("key.value.separator.in.input.line", ",");
Job job = Job.getInstance(conf, "Sample");
job.setJarByClass(ClassName.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(TextSumReducer.class);
job.setReducerClass(TextSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
最后,我通过另一个代码逻辑得到了答案。
public class TestClass {
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, Text>{
private Text word = new Text();
private final NullWritable nullKey = NullWritable.get();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] valSeg = value.toString().split("\|");
String rwid = valSeg[10];
context.write(new Text(rwid), value);
}
}
public static class TextSumReducer extends Reducer<Text,Text,Text,Text> {
private final NullWritable nullKey = NullWritable.get();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int valueItr = 0;
for (Text val : values) {
String valueToSplit = val.toString().trim();
String valSeg[] = valueToSplit.split("\|");
int open = 0;
String value = "";
String prevDateAgg = "0000-00-00";
int prevId = 0;
String[] valSeg = val.toString().split(",");
String source = valSeg[2];
String state = valSeg[3];
String dateAggregated = valSeg[4];
String status = valSeg[5];
String firstSource = valSeg[6];
String firstDate = valSeg[7];
String firstStatus = valSeg[8];
int lifeReId = Integer.parseInt(valSeg[9]);
String rwid = valSeg[10];
if(dateAggregated.equalsIgnoreCase("null")){
dateAggregated = "0000-00-00";
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date1,date2;
try {
date1 = sdf.parse(dateAggregated);
date2 = sdf.parse(prevDateAgg);
long diff = date1.getTime() - date2.getTime();
long gapDays = TimeUnit.DAYS.convert(diff, TimeUnit.MILLISECONDS);
String origStatus = "";
if(prevDateAgg!=firstDate && (firstStatus != "" && firstStatus!="null") && lifeReId>0 && (firstDate!= "null" && firstDate!="0000-00-00")){
if(firstStatus.equalsIgnoreCase("for_rent") || firstStatus.equalsIgnoreCase("for_sale") || firstStatus.equalsIgnoreCase("for sale") || firstStatus.equalsIgnoreCase("for rent") || firstStatus.equalsIgnoreCase("Foreclosure") || firstStatus.equalsIgnoreCase("Reo")){
origStatus = "open";
String content = rwid + "|" + firstDate + "|" + origStatus;
context.write(key,new Text(content));
}else if(!firstStatus.equalsIgnoreCase("off_market")){
origStatus = "close";
}else{
origStatus = "";
}
prevDateAgg = firstDate;
value = origStatus;
}
} catch (ParseException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception {
Path p = new Path(args[1]);
FileSystem fs = FileSystem.get(new Configuration());
fs.exists(p);
fs.delete(p, true);
Configuration conf = new Configuration();
conf.set("key.value.separator.in.input.line", "|");
Job job = Job.getInstance(conf, "OpenClose");
job.setJarByClass(TestClass.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(TextSumReducer.class);
job.setReducerClass(TextSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}