我正在构建mapreducer应用程序,它以带有随机数的.txt作为输入,我想接收这样的输出信息:
最大数量:xx算术平均值:xx几何平均值:xx中位数:xx
我的代码:
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class NumCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text number = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
number.set(itr.nextToken());
context.write(number, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
List<Integer> numList = new ArrayList<Integer>();
for (IntWritable val : values) {
numList.add(val.get());
}
// Max number from file
int maxNumber = Collections.max(numList,null);
// Arithmetic average
float sum = 0;
for (int i : numList)
sum += i;
float arithmeticAverage = sum / numList.size();
// Geometric average
sum = 1;
for (int i : numList)
sum *= i;
double geometricAverage = Math.pow(sum, (float)1/numList.size());
// Median
float median;
if (numList.size() % 2 == 0)
median = (float)(numList.get(numList.size()/2) + numList.get(numList.size()/2 - 1))/2;
else
median = numList.get(numList.size()/2);
String summary = "Max number: " + maxNumber + "nArithmetic avg: " + arithmeticAverage + "nGeometric avg: " + geometricAverage + "nMedian" + median;
result.set(summary);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "number count");
job.setJarByClass(NumCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我的代码的问题是,我收到了一个错误,我无法将字符串放入IntWritable(看起来很逻辑,但我如何解析字符串值以输出?(
result.set(summary);
更重要的是,当我尝试做这样的事情时:
result.set(median);
我没有收到中值,相反,我收到了糟糕的输出,这是来自输入文件的数字列表,其中包含";1〃;附近的
我对hadoop完全是个新手,我不知道该怎么做,有什么建议吗;x
由于您有String summary
,显然答案是使用Text
而不是IntWritable
。。。如果要返回多个值,其中多个值不是整数,则不要使用IntWritable
。
此外,这种逻辑甚至是不正确的,因为所有相等的数字最终都在同一个归约器中,所以";maxNumber";例如,永远不会是总的最大值,因此您将拥有与唯一输入值相同的reducer输出值。解决方案是使用NullWritable
作为reducer键(和mapper键输出(,将所有数字强制到一个reducer中,以便可以对它们进行最大化/平均化/求和等。您也不需要numList
,因为Iterable<IntWritable>
已经可以迭代了;你应该只需要一个循环来完成所有的计算,除了中位数,你需要首先对数字进行排序。
我个人的建议是使用Spark或Hive进行统计分析,而不是简单的Mapreduce。。。