Reducer不能按键对不同的映射器进行分组



用例:

  1. 文件1包含印象数据,其中包含trackerId +其他字段
  2. 文件2包含点击详情包含trackerId +点击

我使用不同的映射器以上两个和一个减速器,但似乎减速器是无法结合两个文件的数据。

package com.hadoop.intellipaat;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.google.common.collect.Lists;
/**
 * This job will combine click and impression on TrackerId
 * 
 * @author raghunandangupta
 *
 */
public class JoinClickImpressionDetailJob {
    public static final String IMPRESSION_PREFIX = "IMPRESSION_PREFIX";
    public static final String CLICK_PREFIX = "CLICK_PREFIX";
    public static final String SEPERATOR = "~";
    private static class ImpressionMapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            /**
             * Excluding header
             */
            if (!(value.toString().indexOf("accountId") != -1)) {
                String words[] = value.toString().split(",");
                if (words.length > 18) {
                    context.write(new Text(words[18].trim()), new Text(IMPRESSION_PREFIX + SEPERATOR + value.toString()));
                }
            } else {
                context.write(new Text(""), value);
            }
        }
    }
    private static class ClickMapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String words[] = value.toString().split(",");
            if (words.length > 18) {
                context.write(new Text(words[18].trim()), new Text(CLICK_PREFIX + SEPERATOR + value.toString()));
            } else {
                context.write(new Text(""), new Text("1"));
            }
        }
    }
    private static class ImpressionClickReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) {
            try {
                System.out.println("=========="+key.toString());
                if (key.toString().length() != 0) {
                    List<Text> myList = Lists.newArrayList(values);
                    for(Text t : myList){
                        System.out.println("#######"+t.toString());
                    }
                    System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@");
                    if (myList.size() == 2) {
                        if (myList.get(0).toString().indexOf(IMPRESSION_PREFIX) != -1 && myList.get(1).toString().indexOf(CLICK_PREFIX) != -1) {
                            String line = myList.get(0).toString().split(SEPERATOR)[1] + ",1";
                            context.write(key, new Text(line));
                        } else if (myList.get(1).toString().indexOf(IMPRESSION_PREFIX) != -1
                                && myList.get(0).toString().indexOf(CLICK_PREFIX) != -1) {
                            String line = myList.get(1).toString().split(SEPERATOR)[1] + ",1";
                            context.write(key, new Text(line));
                        }
                    }
                }
            } catch (Exception exception) {
                exception.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            // conf.set("mapreduce.output.fileoutputformat.compress", "true");
            // conf.set("mapreduce.output.fileoutputformat.compress.codec",
            // "org.apache.hadoop.io.compress.GzipCodec");
            // conf.set("mapreduce.map.output.compress.codec",
            // "org.apache.hadoop.io.compress.SnappyCodec");
            // conf.set("mapreduce.output.fileoutputformat.compress.type",
            // "BLOCK");
            Job job = Job.getInstance(conf, "IMPRESSION_CLICK_COMBINE_JOB");
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setReducerClass(ImpressionClickReducer.class);
            FileInputFormat.setInputDirRecursive(job, true);
            // FileInputFormat.addInputPath(job, new Path(args[0]));
            // job.setMapperClass(ImpressionMapper.class);
            /**
             * Here directory of impressions will be present
             */
            MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ImpressionMapper.class);
            /**
             * Here directory of clicks will be present
             */
            MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ClickMapper.class);
            FileOutputFormat.setOutputPath(job, new Path(args[2]));
            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

任何线索将不胜感激。

File 1 [trackerId1,record1]File2 [treackerId1, Clicked]

在reducer中我得到:

trackerId,[record1,record1]理想情况下应该是trackerId ,[record1,clicked]

您的问题很可能与减速器中的这一行有关:

List<Text> myList = Lists.newArrayList(values);

要记住的主要事情是Iterable<Text> values在迭代时重用它给你的Text对象。因此,您可能会向数组中添加两个Text对象,但它们指向同一个对象。

如果你看看Lists.newArrayList()是如何工作的,它只是向数组中添加对象,而不是创建一个新的。

所以如果你要使用Text对象,你需要在每次向数组中添加值时创建一个新的对象。这就是人们在这种情况下使用string的典型原因。快速检查是否存在问题的方法是将此代码更改为如下内容:

List<Text> myList = new ArrayList<Text>();
for (Text v : values) {
    myList.add(new Text(v));
}

因此,每次都创建一个新的Text

相关内容

  • 没有找到相关文章

最新更新