我是Apache Hadoop的初学者,并尝试了Apache的字数统计程序,它工作得很好。但是现在我想制作自己的户外温度程序来计算每日平均值。平均计算没有像我预期的那样工作;不进行数据的组合和平均。
更具体地说,这是我的 sample2.txt 输入文件的一部分:
25022016 00:00:00 -10.3
25022016 00:01:00 -10.3
25022016 00:02:00 -10.3
25022016 00:03:00 -10.3
...
25022016 00:59:00 -11.2
我想要的输出应该是:
25022016 7.9
这是该日期所有温度观测值的平均值。所以我有 60 个观察结果,想要一个平均值。将来,我想使用相同的程序在更多的日子里处理更多的观察结果。1.列是日期(文本),2。时间,第三个是温度。温度计算在代码中以 Java 的浮点数据类型完成。
现在发生的情况是输出是:
25022016 -10.3
25022016 -10.3
25022016 -10.3
25022016 -10.3
...
25022016 -11.2
因此,计算每个观测值的平均值(从一个数字计算一个数字的平均值)。我想要 60 个观察的平均值(一个数字)!
所以我的输入和输出文件在上面。我的Java代码(我在Windows 7 -> VirtualBox-> Ubuntu 64位上运行它)如下:
package hadoop;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.cli.Options;
public class ProcessUnits2
{
public static class E_EMapper extends
Mapper<Object, Text, Text, FloatWritable>
{
private FloatWritable temperature = new FloatWritable();
private Text date = new Text();
public void map(Object key, Text value,
Context context) throws IOException, InterruptedException
{
StringTokenizer dateTimeTemperatures = new StringTokenizer(value.toString());
while(dateTimeTemperatures.hasMoreTokens()) {
date.set(dateTimeTemperatures.nextToken());
while(dateTimeTemperatures.hasMoreTokens()) {
dateTimeTemperatures.nextToken();
temperature.set(Float.parseFloat(dateTimeTemperatures.nextToken()));
context.write(date, temperature);
}
}
}
}
public static class E_EReduce extends Reducer<Text,Text,Text,FloatWritable>
{
private FloatWritable result = new FloatWritable();
public void reduce( Text key, Iterable<FloatWritable> values, Context context
) throws IOException, InterruptedException
{
float sumTemperatures=0, averageTemperature;
int countTemperatures=0;
for (FloatWritable val : values) {
sumTemperatures += val.get();
countTemperatures++;
}
averageTemperature = sumTemperatures / countTemperatures;
result.set(averageTemperature);
context.write(key, result);
}
}
public static void main(String args[])throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "VuorokaudenKeskilampotila");
job.setJarByClass(ProcessUnits2.class);
job.setMapperClass(E_EMapper.class);
job.setCombinerClass(E_EReduce.class);
job.setReducerClass(E_EReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
---------------------------------------------------
Hadoop版本是2.7.2和Ubuntu 14.04 LTS。我在独立模式下运行 hadoop (最基本的设置)。
以下是我用来构建程序的命令(如果有帮助的话?
rm -rf output2
javac -Xdiags:verbose -classpath hadoop-core-1.2.1.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar -d units2 ProcessUnits2.java
jar -cvf units2.jar -C units2/ .
hadoop jar units2.jar hadoop.ProcessUnits2 input2 output2
cat output2/part-m-00000
作为一个初学者,我很困惑,在我看来,Hadoop并没有在这里做任何组合和减少(=平均)工作,它的默认设置应该是它最最终的目的。我承认我从这里和那里(示例)中选择了代码,因为没有任何效果,我确信这只是解决的一小步,但我无法猜测它是什么。例如,我可以轻松地做到这一点C++根本没有任何映射缩减框架,但问题是我希望基本操作正常工作,以便我可以继续更复杂的示例,并在最终生产使用和真正的分布式映射组合中减少。
我会非常感谢任何形式的帮助。我现在被困在这个(很多很多小时...如果您需要任何额外的数据来帮助找到解决方案,我会发送它们。
您没有正确实现化简器。它应该是:
public static class E_EReduce extends Reducer<Text, FloatWritable, Text, FloatWritable>
{
@Override
public void reduce( Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException
{
永远不要忘记@Override
,否则编译器不会捕获错误。
现在我注意到问题出在哪里:
线:
job.setNumReduceTasks(0);
说没有减速器。我将其更改为job.setNumReduceTasks(1);
,甚至将其完全删除,现在程序正在运行。为什么会出现在那里?=>因为遇到麻烦,您尝试了一切可能,没有时间阅读文档。
感谢所有参与的人。我继续研究这个系统。