我正在写一个示例,以查找从_date和to date中特定时间段内股票的开盘价和收盘价的最大值,最小值。
public class StockByDate extends Configured implements Tool {
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
public static Date fromdate;
public static Date todate;
{
try {
fromdate = df.parse("2000-01-01");
todate = df.parse("2005-01-01");
} catch (ParseException f) {
f.printStackTrace();
}
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Date, Open, High,Low, Close, Volume, Adj Close
// 2010-10-18,40.66,41.74,40.44,41.49,10620000, 41.49
try {
String[] tokens = value.toString().split(",");
String date = tokens[0].trim();
// String open_close =
// tokens[1].concat(",").concat(tokens[4].trim());
String open_close = tokens[1] + ";" + tokens[4];
Date givendate;
givendate = df.parse(date);
if (givendate.after(fromdate) && givendate.before(todate)) {
context.write(new Text(date), new Text(open_close));
System.out.println(open_close);
}
} catch (ParseException e) {
e.printStackTrace();
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public static float maxopen = (float) 0;
public static float minopen = Float.MAX_VALUE;
public Text maxopenkey;
public Text minopenkey;
public static float maxclose = (float) 0;
public static float minclose = Float.MAX_VALUE;
public Text maxclosekey;
public Text minclosekey;
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
boolean maxopenfound = false;
boolean minopenfound = false;
boolean maxclosefound = false;
boolean minclosefound = false;
for (Text val : values) {
System.out.println(key.toString() + " " + val.toString());
String[] temp_present = val.toString().split(";");
System.out.println(key.toString() + " " + val.toString());
float open_float = Float.valueOf(temp_present[0].trim());
float close_float = Float.valueOf(temp_present[1].trim());
if (open_float >= maxopen) {
maxopen = open_float;
maxopenkey = key;
maxopenfound = true;
}
if (open_float <= minopen) {
minopen = open_float;
minopenkey = key;
minopenfound = true;
}
/*
* if(close_float >= maxclose){ maxclose = close_float;
* maxclosekey=key; maxclosefound = true; } if(close_float <=
* minclose){ minclose = close_float; minclosekey=key;
* minclosefound = true; }
*/
/*
* if (it.hasNext()){
*
* String[] temp_next = it.next().toString().split(","); float
* open_float_next = Float.valueOf(temp_next[0].trim());
*
* if(open_float <= open_float_next){ minopen = open_float;
* minopenkey = key; minopenfound = true; }
*
* }
*/
// float presentOpenValue = Float.valueOf(temp[0]);
// float presentMaxCloseValue = Float.parseFloat(temp[1]);
}
// con text.write(key, new Text(String.valueOf(maxopen)));
if (maxopenfound) {
Text newmax = new Text();
newmax.set(String.valueOf(maxopen));
context.write(maxopenkey, newmax);
}
if (minopenfound) {
Text newmin = new Text();
newmin.set(String.valueOf(minopen));
context.write(minopenkey, newmin);
}
}
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "StockByDate");
job.setJarByClass(StockByDate.class);
job.setJobName("StockByDate");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new StockByDate(), args);
System.exit(ret);
}
}
下面是例外:
十= 05/06/06 05:19:09 INFO Configuration.deprecation: mapred.skip.on is弃用。相反,可以使用mapreduce.job. skiprerecords 2000-01-03[4.8751115/06/06 05:19:09 . INFO已映射。LocalJobRunner: reduce task executor complete.
2000-01-03 104.87511 15/06/06 05:19:09 WARN mapred。LocalJobRunner:job_local790515175_0001 java.lang.Exception:java.lang.ArrayIndexOutOfBoundsException: 1 atorg.apache.hadoop.mapred.LocalJobRunner Job.runTasks美元(LocalJobRunner.java: 462)在org.apache.hadoop.mapred.LocalJobRunner Job.run美元(LocalJobRunner.java: 529)原因:java.lang.ArrayIndexOutOfBoundsException: 1 atorg.wordcompute.stock.StockByDate Reduce.reduce美元(StockByDate.java: 117)在org.wordcompute.stock.StockByDate Reduce.reduce美元(StockByDate.java: 1)apache.hadoop.mapreduce. reduce. run(reduce. java:171org.apache.hadoop.mapred.ReduceTask.runNewReducer (ReduceTask.java: 627)在org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389) atorg.apache.hadoop.mapred.LocalJobRunner工作ReduceTaskRunnable.run美元(LocalJobRunner.java: 319)在java.util.concurrent.Executors RunnableAdapter.call美元(Executors.java: 471)在java.util.concurrent. futurettask .run(futurettask .java:262) atjava.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1145)在java.util.concurrent.ThreadPoolExecutor Worker.run美元(ThreadPoolExecutor.java: 615)在java.lang.Thread.run(Thread.java:745) 15/06/06 05:19:10 INFOmapreduce。任务:地图100%减少0% 15/06/06 05:19:10信息mapreduce。任务:Job job_local790515175_0001 failed,状态failed由于:NA 15/06/06 05:19:10 INFO mapreduce。Job: Counters: 33文件系统计数器FILE: Number of bytes read=550953 FILE: Number of bytes read=550953FILE:读取操作数=0 FILE:大读操作数=0 FILE:大写操作数=0Map- reduce Framework Map输入记录=8692 Map输出记录=1256 Map输出字节=35609 Map输出物化bytes=1012输入拆分bytes=136合并输入记录=1256合并输出记录=46减少输入组=0减少洗牌减少输入记录=0减少输出记录=0溢出的记录=46张洗牌的地图=1张失败的洗牌=0张合并的地图Map输出=1 GC时间(ms)=45 CPU时间(ms)=0物理内存(字节)快照=0虚拟内存(字节)总已提交堆使用(字节)=165613568 Shuffleerror BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0WRONG_MAP=0 WRONG_REDUCE=0文件输入格式计数器字节Read=550760 File Output Format Counters Bytes write
你的代码抛出了一个ArrayIndexOutOfBoundsException,如第117行堆栈跟踪所示。
I suspect
float close_float = Float.valueOf(temp_present[1].trim());
如果在temp_present中只有一个值。检查你的值