每个产品作为输出的order_demand的平均值- MapReduce - Java



我是mapreduce主题的新手,仍处于学习阶段。我提前感谢你的帮助和进一步的提示。在大学的一次练习中,我有以下问题:从一个CSV文件(如下所示为例)中,我想计算每个product_code的平均order_demand。

代码,如下所示&;frequencymapper &;,"FreqeuencyReducer"都在我的服务器上运行,我认为我目前有输出的显示问题。因为这是我第一次开始使用mapreduce,所以我很感谢任何帮助。

下面列出了映射器、减速机和驱动程序代码。

数据集示例(csv-file)

Product_Code,Warehouse,Product_Category,Date,Order_Demand
Product_0993,Whse_J,Category_028,2012/7/27,100
Product_0979,Whse_J,Category_028,2012/6/5,500 
Product_0979,Whse_E,Category_028,2012/11/29,500 
Product_1157,Whse_E,Category_006,2012/6/4,160000 
Product_1159,Whse_A,Category_006,2012/7/17,50000 

我的目标例如:

Product_0979   500
Product_1157   105000
...

FrequencyMapper.java:

package ma.test.a02;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FrequencyMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
public void map(LongWritable offset, Text lineText, Context context)
throws IOException, InterruptedException {

String line = lineText.toString();

if(line.contains("Product")) {
String productcode = line.split(",")[0];

float orderDemand = Float.parseFloat(line.split(",")[4]);

context.write(new Text(productcode), new IntWritable((int) orderDemand));
}
}
}

FrequencyReducer.java:

package ma.test.a02;
import java.io.IOException;
import javax.xml.soap.Text;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class FrequencyReducer extends Reducer< Text ,  IntWritable ,  IntWritable ,  FloatWritable > {
public void reduce( IntWritable productcode,  Iterable<IntWritable> orderDemands,  Context context)
throws IOException,  InterruptedException {

float averageDemand  = 0;
float count = 0;
for ( IntWritable orderDemand : orderDemands) {

averageDemand +=orderDemand.get();
count +=1;
}

float result = averageDemand / count;

context.write(productcode,  new FloatWritable (result));
}
}

Frequency.java(司机):

package ma.test.a02;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Frequency {

public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: Average <input path> <output path>");
System.exit(-1);
}

// create a Hadoop job and set the main class
Job job = Job.getInstance();
job.setJarByClass(Frequency.class);
job.setJobName("MA-Test Average");

// set the input and output path
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// set the Mapper and Reducer class
job.setMapperClass(FrequencyMapper.class);
job.setReducerClass(FrequencyReducer.class);

// specify the type of the output
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);

// run the job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

提示1:在映射器中,您已经过滤了包含"VOLUME"在以下行中:

if(line.contains("VOLUME")) {
}

但是没有一行包含"VOLUME"所以你没有减速机的输入!

提示2:你的减速器输出值是FloatWritable,你应该在你的跑道(Frequency类)中使用这行:

job.setOutputValueClass(FloatWritable.class);

而不是这个

job.setOutputValueClass(IntWritable.class);

提示3:在减速机中改变这一行:

public class FrequencyReducer extends Reducer<IntWritable ,  IntWritable ,  IntWritable ,  FloatWritable> 

到这个:

public class FrequencyReducer extends Reducer<Text, IntWritable,  IntWritable, FloatWritable > 

还将这些行添加到Frequency类中:

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

技巧4: CSV文件中的第一行,它描述了CSV文件的结构,这将导致问题。通过在map方法的第一行放置以下行来拒绝这一行:

if(line.contains("Product_Code,Warehouse")) {
return;
}

技巧5:在实际程序中,请确保您有String的计划,不能在orderDemand中转换为Integer

最后你的映射器将是:

public class FrequencyMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable offset, Text lineText, Context context)
throws IOException, InterruptedException {
String line = lineText.toString();
if (line.contains("Product_Code,Warehouse")) {
return;
}
if (line.contains("Product")) {
String productcode = line.split(",")[0].trim();
int orderDemand = Integer.valueOf(line.split(",")[4].trim());
context.write(new Text(productcode), new IntWritable(orderDemand));
}
}
}

这是你的减速机:

public class FrequencyReducer extends Reducer<Text, IntWritable , Text, FloatWritable > {
public void reduce( Text productcode,  Iterable<IntWritable> orderDemands,  Context context)
throws IOException,  InterruptedException {
float averageDemand  = 0;
float count = 0;
for ( IntWritable orderDemand : orderDemands) {
averageDemand +=orderDemand.get();
count +=1;
}
float result = averageDemand / count;
context.write(productcode,  new FloatWritable (result));
}
}

这是你的跑步者:

public class Frequency {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: Average <input path> <output path>");
System.exit(-1);
}

// create a Hadoop job and set the main class
Job job = Job.getInstance();
job.setJarByClass(Frequency.class);
job.setJobName("MA-Test Average");
// set the input and output path
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// set the Mapper and Reducer class
job.setMapperClass(FrequencyMapper.class);
job.setReducerClass(FrequencyReducer.class);
// specify the type of the output
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
// run the job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}