我有如下CSV文件.我正在编写mapReduce程序,该程序计算在特定日期销售最多的产品。
CSV 数据
为此,映射器的输出应为以下形式
09-1-2=>[产品1,产品2,产品1,产品2,产品4,.....]
我编写了映射器代码如下
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
String line = value.toString();
String[] arrLine = line.split(",");
String strDateTime = arrLine[0];
String strDate = strDateTime.substring(0, strDateTime.indexOf(" "));
String strProductName = arrLine[1];
Map products = new HashMap<String, String>();
String strProdAdded = null;
if(products.get(strDate)!= null)
{
strProdAdded = products.get(strDate).toString();
strProdAdded += strProductName + ",";
products.put(strDate, strProdAdded);
}else
{
products.put(strDate, strProductName);
}
output.collect(new Text(strDate), new Text(strProductName));
}
但是我无法弄清楚获得所需输出的确切方法,如下所示
09-1-2=>[产品1,产品2,产品1,产品2,产品4,.....]
你将不得不使用cleanup()
方法,我添加了System.out
语句,以便你可以理解方法中发生了什么。 在此处查看可用于映射器类的可用方法。
public static class StackMapper extends Mapper<Object, Text, Text, Text> {
private Map<Text, ArrayList<Text>> products = new HashMap<Text, ArrayList<Text>>();
private ArrayList<Text> p = new ArrayList<Text>();
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] arrLine = line.split(",");
Text strDate = new Text(arrLine[0].substring(0, arrLine[0].indexOf(" ")));
Text strProductName = new Text(arrLine[1]);
if(products.containsKey(strDate))
{
if(!products.get(strDate).contains(strProductName)) {
System.out.println("has date: " + strDate + " " + strProductName + " not exist, added to list: " + p.toString());
p.add(strProductName);
}
System.out.println("has date: " + strDate + ", " + strProductName + " added to list: " + p.toString());
}else
{
p = new ArrayList<Text>();
p.add(strProductName);
System.out.println("new date: " + strDate + ", " + strProductName + " added to list: " + p.toString());
}
products.put(new Text(strDate), p);
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
for ( Text date : products.keySet()){
context.write(date, new Text(products.get(date).toString()));
}
}
}
输入:
1/2/09 6:17,product1,f3,f4,f5
1/2/09 6:17,product2,f3,f4,f5
1/2/09 6:17,product3,f3,f4,f5
1/2/09 6:17,product4,f3,f4,f5
1/2/09 6:17,product4,f3,f4,f5
1/2/10 6:17,product1,f3,f4,f5u
1/2/10 6:17,product2,f3,f4,f5u
1/2/10 6:17,product3,f3,f4,f5u
1/2/11 6:17,product2,f3,f4,f5u
1/2/12 6:17,product2,f3,f4,f5u
1/2/12 6:17,product3,f3,f4,f5u
输出:
1/2/09 [product1, product2, product3, product4]
1/2/10 [product1, product2, product3]
1/2/12 [product2, product3]
1/2/11 [product2]
MR 作业的标准输出:
new date: 1/2/09, product1 added to list: [product1]
has date: 1/2/09 product2 not exist, added to list: [product1]
has date: 1/2/09, product2 added to list: [product1, product2]
has date: 1/2/09 product3 not exist, added to list: [product1, product2]
has date: 1/2/09, product3 added to list: [product1, product2, product3]
has date: 1/2/09 product4 not exist, added to list: [product1, product2, product3]
has date: 1/2/09, product4 added to list: [product1, product2, product3, product4]
has date: 1/2/09, product4 added to list: [product1, product2, product3, product4]
new date: 1/2/10, product1 added to list: [product1]
has date: 1/2/10 product2 not exist, added to list: [product1]
has date: 1/2/10, product2 added to list: [product1, product2]
has date: 1/2/10 product3 not exist, added to list: [product1, product2]
has date: 1/2/10, product3 added to list: [product1, product2, product3]
new date: 1/2/11, product2 added to list: [product2]
new date: 1/2/12, product2 added to list: [product2]
has date: 1/2/12 product3 not exist, added to list: [product2]
has date: 1/2/12, product3 added to list: [product2, product3]
在我看来,您期望的输出可能是 Reduce 作业的结果。这本质上是一个 GroupBy date 查询实现,据我了解,它由 reduce job 处理。根据您的程序,我想您正在从地图作业中输出正确的键和值。一旦它们被分类、洗牌并传递给减速器,您将能够看到特定日期的所有产品分组在一起。
附言如果有机会,我会使用 Pig 或 Hive 来解决这个问题,除非您这样做是为了尝试使用 Map-Reduce。