映射器输出为逗号分隔值



我有如下CSV文件.我正在编写mapReduce程序,该程序计算在特定日期销售最多的产品。

CSV 数据

为此,映射器的输出应为以下形式

09-1-2=>[产品1,产品2,产品1,产品2,产品4,.....]

我编写了映射器代码如下

public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {
    String line = value.toString();
    String[] arrLine = line.split(",");
    String strDateTime        = arrLine[0];     
    String strDate  = strDateTime.substring(0, strDateTime.indexOf(" ")); 
    String strProductName = arrLine[1];
    Map products = new HashMap<String, String>();
    String strProdAdded = null;
    if(products.get(strDate)!= null)
    {
        strProdAdded = products.get(strDate).toString();
        strProdAdded += strProductName + ",";
        products.put(strDate, strProdAdded);
    }else
    {
        products.put(strDate, strProductName);
    }
    output.collect(new Text(strDate), new Text(strProductName));
}

但是我无法弄清楚获得所需输出的确切方法,如下所示

09-1-2=>[产品1,产品2,产品1,产品2,产品4,.....]

你将不得不使用cleanup()方法,我添加了System.out语句,以便你可以理解方法中发生了什么。 在此处查看可用于映射器类的可用方法。

public static class StackMapper extends Mapper<Object, Text, Text, Text> {
        private Map<Text, ArrayList<Text>> products = new HashMap<Text, ArrayList<Text>>();
        private ArrayList<Text> p = new ArrayList<Text>();
        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
               String line = value.toString();
               String[] arrLine = line.split(",");
               Text strDate  = new Text(arrLine[0].substring(0, arrLine[0].indexOf(" "))); 
               Text strProductName = new Text(arrLine[1]);
               if(products.containsKey(strDate))
                {
                    if(!products.get(strDate).contains(strProductName)) {
                        System.out.println("has date: " + strDate + " " + strProductName + " not exist, added to list: " + p.toString());
                        p.add(strProductName);
                    }  
                    System.out.println("has date: " + strDate + ", "  + strProductName + " added to list: " + p.toString());      
                }else
                {
                    p = new ArrayList<Text>();
                    p.add(strProductName);
                    System.out.println("new date: " + strDate + ", " + strProductName + " added to list: " + p.toString());  
                }
               products.put(new Text(strDate), p);   
        }  
        @Override
        protected void cleanup(Context context)
                throws IOException, InterruptedException {
            for ( Text date : products.keySet()){
                context.write(date, new Text(products.get(date).toString()));
            }
        }
    }

输入:

1/2/09 6:17,product1,f3,f4,f5
1/2/09 6:17,product2,f3,f4,f5
1/2/09 6:17,product3,f3,f4,f5
1/2/09 6:17,product4,f3,f4,f5
1/2/09 6:17,product4,f3,f4,f5
1/2/10 6:17,product1,f3,f4,f5u
1/2/10 6:17,product2,f3,f4,f5u
1/2/10 6:17,product3,f3,f4,f5u
1/2/11 6:17,product2,f3,f4,f5u
1/2/12 6:17,product2,f3,f4,f5u
1/2/12 6:17,product3,f3,f4,f5u

输出:

1/2/09  [product1, product2, product3, product4]
1/2/10  [product1, product2, product3]
1/2/12  [product2, product3]
1/2/11  [product2]

MR 作业的标准输出:

new date: 1/2/09, product1 added to list: [product1]
has date: 1/2/09 product2 not exist, added to list: [product1]
has date: 1/2/09, product2 added to list: [product1, product2]
has date: 1/2/09 product3 not exist, added to list: [product1, product2]
has date: 1/2/09, product3 added to list: [product1, product2, product3]
has date: 1/2/09 product4 not exist, added to list: [product1, product2, product3]
has date: 1/2/09, product4 added to list: [product1, product2, product3, product4]
has date: 1/2/09, product4 added to list: [product1, product2, product3, product4]
new date: 1/2/10, product1 added to list: [product1]
has date: 1/2/10 product2 not exist, added to list: [product1]
has date: 1/2/10, product2 added to list: [product1, product2]
has date: 1/2/10 product3 not exist, added to list: [product1, product2]
has date: 1/2/10, product3 added to list: [product1, product2, product3]
new date: 1/2/11, product2 added to list: [product2]
new date: 1/2/12, product2 added to list: [product2]
has date: 1/2/12 product3 not exist, added to list: [product2]
has date: 1/2/12, product3 added to list: [product2, product3]

在我看来,您期望的输出可能是 Reduce 作业的结果。这本质上是一个 GroupBy date 查询实现,据我了解,它由 reduce job 处理。根据您的程序,我想您正在从地图作业中输出正确的键和值。一旦它们被分类、洗牌并传递给减速器,您将能够看到特定日期的所有产品分组在一起。

附言如果有机会,我会使用 Pig 或 Hive 来解决这个问题,除非您这样做是为了尝试使用 Map-Reduce。

相关内容

  • 没有找到相关文章

最新更新