无法从地图任务中看到打印件



我已经设置了mapred-site.xml[1]和log4j.properties[2]来显示调试日志文件,我还设置了映射类[3]System.out.printlns,但我没有看到它被打印出来。我已经查看了hadoop的日志,或者任务的日志,但我没有从打印的地图任务中看到任何内容。我不明白我为什么会得到这个。有什么帮助吗?

[1] mapred-site.xml

    ~/Programs/hadoop$ cat etc/hadoop/mapred-site.xml
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
    (...)
     <property> <name>mapreduce.map.log.level</name> <value>DEBUG</value> </property>
    </configuration>

[2] log4j.properties

    hadoop.root.logger=DEBUG,console
    hadoop.log.dir=.
    hadoop.log.file=hadoop.log
    # Define the root logger to the system property "hadoop.root.logger".
    log4j.rootLogger=${hadoop.root.logger}, EventCounter
    # Logging Threshold
    log4j.threshold=ALL
    # Null Appender
    log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender
    #
    # Rolling File Appender - cap space usage at 5gb.
    #
    hadoop.log.maxfilesize=256MB
    hadoop.log.maxbackupindex=20
    log4j.appender.RFA=org.apache.log4j.RollingFileAppender
    log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
    log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize}
    log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex}
    log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
    # Pattern format: Date LogLevel LoggerName LogMessage
    log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
    # Debugging Pattern format
    #log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n

    #
    # Daily Rolling File Appender
    #
    log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
    log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
    # Rollver at midnight
    log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
    # 30-day backup
    #log4j.appender.DRFA.MaxBackupIndex=30
    log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
    # Pattern format: Date LogLevel LoggerName LogMessage
    log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
    # Debugging Pattern format
    #log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n

    #
    # console
    # Add "console" to rootlogger above if you want to use this 
    #
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
    #
    # TaskLog Appender
    #
    #Default values
    hadoop.tasklog.taskid=null
    hadoop.tasklog.iscleanup=false
    hadoop.tasklog.noKeepSplits=4
    hadoop.tasklog.totalLogFileSize=100
    hadoop.tasklog.purgeLogSplits=true
    hadoop.tasklog.logsRetainHours=12
    log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
    log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
    log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup}
    log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
    log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
    log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
    #
    # HDFS block state change log from block manager
    #
    # Uncomment the following to suppress normal block state change
    # messages from BlockManager in NameNode.
    #log4j.logger.BlockStateChange=WARN
    #
    #Security appender
    #
    hadoop.security.logger=INFO,NullAppender
    hadoop.security.log.maxfilesize=256MB
    hadoop.security.log.maxbackupindex=20
    log4j.category.SecurityLogger=${hadoop.security.logger}
    hadoop.security.log.file=SecurityAuth-${user.name}.audit
    log4j.appender.RFAS=org.apache.log4j.RollingFileAppender 
    log4j.appender.RFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
    log4j.appender.RFAS.layout=org.apache.log4j.PatternLayout
    log4j.appender.RFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
    log4j.appender.RFAS.MaxFileSize=${hadoop.security.log.maxfilesize}
    log4j.appender.RFAS.MaxBackupIndex=${hadoop.security.log.maxbackupindex}
    #
    # Daily Rolling Security appender
    #
    log4j.appender.DRFAS=org.apache.log4j.DailyRollingFileAppender 
    log4j.appender.DRFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
    log4j.appender.DRFAS.layout=org.apache.log4j.PatternLayout
    log4j.appender.DRFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
    log4j.appender.DRFAS.DatePattern=.yyyy-MM-dd
    #
    # hadoop configuration logging
    #
    # Uncomment the following line to turn off configuration deprecation warnings.
    # log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN
    #
    # hdfs audit logging
    #
    hdfs.audit.logger=INFO,NullAppender
    hdfs.audit.log.maxfilesize=256MB
    hdfs.audit.log.maxbackupindex=20
    log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
    log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false
    log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
    log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log
    log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
    log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
    log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize}
    log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}
    #
    # mapred audit logging
    #
    mapred.audit.logger=INFO,NullAppender
    mapred.audit.log.maxfilesize=256MB
    mapred.audit.log.maxbackupindex=20
    log4j.logger.org.apache.hadoop.mapred.AuditLogger=${mapred.audit.logger}
    log4j.additivity.org.apache.hadoop.mapred.AuditLogger=false
    log4j.appender.MRAUDIT=org.apache.log4j.RollingFileAppender
    log4j.appender.MRAUDIT.File=${hadoop.log.dir}/mapred-audit.log
    log4j.appender.MRAUDIT.layout=org.apache.log4j.PatternLayout
    log4j.appender.MRAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
    log4j.appender.MRAUDIT.MaxFileSize=${mapred.audit.log.maxfilesize}
    log4j.appender.MRAUDIT.MaxBackupIndex=${mapred.audit.log.maxbackupindex}
    # Custom Logging levels
    log4j.logger.org.apache.hadoop=DEBUG
    # Jets3t library
    log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
    # AWS SDK & S3A FileSystem
    log4j.logger.com.amazonaws=ERROR
    log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR
    log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN
    #
    # Event Counter Appender
    # Sends counts of logging messages at different severity levels to Hadoop Metrics.
    #
    log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter
    #
    # Job Summary Appender 
    #
    # Use following logger to send summary to separate file defined by 
    # hadoop.mapreduce.jobsummary.log.file :
    # hadoop.mapreduce.jobsummary.logger=DEBUG,JSA
    # 
    hadoop.mapreduce.jobsummary.logger=${hadoop.root.logger}
    hadoop.mapreduce.jobsummary.log.file=hadoop-mapreduce.jobsummary.log
    hadoop.mapreduce.jobsummary.log.maxfilesize=256MB
    hadoop.mapreduce.jobsummary.log.maxbackupindex=20
    log4j.appender.JSA=org.apache.log4j.RollingFileAppender
    log4j.appender.JSA.File=${hadoop.log.dir}/${hadoop.mapreduce.jobsummary.log.file}
    log4j.appender.JSA.MaxFileSize=${hadoop.mapreduce.jobsummary.log.maxfilesize}
    log4j.appender.JSA.MaxBackupIndex=${hadoop.mapreduce.jobsummary.log.maxbackupindex}
    log4j.appender.JSA.layout=org.apache.log4j.PatternLayout
    log4j.appender.JSA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
    log4j.logger.org.apache.hadoop.mapred.JobInProgress$JobSummary=${hadoop.mapreduce.jobsummary.logger}
    log4j.additivity.org.apache.hadoop.mapred.JobInProgress$JobSummary=false

    log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=${yarn.server.resourcemanager.appsummary.logger}
    log4j.additivity.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=false
    log4j.appender.RMSUMMARY=org.apache.log4j.RollingFileAppender
    log4j.appender.RMSUMMARY.File=${hadoop.log.dir}/${yarn.server.resourcemanager.appsummary.log.file}
    log4j.appender.RMSUMMARY.MaxFileSize=256MB
    log4j.appender.RMSUMMARY.MaxBackupIndex=20
    log4j.appender.RMSUMMARY.layout=org.apache.log4j.PatternLayout
    log4j.appender.RMSUMMARY.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n

[3] 我的地图类别:

    static abstract class SampleMapBase
            extends Mapper<Text, Text, Text, Text>{
        private long total;
        private long kept = 0;
        private float keep;
        protected void setKeep(float keep) {
            this.keep = keep;
        }
        protected void emit(Text key, Text val, OutputCollector<Text,Text> out)
                throws IOException {
            System.out.println("Key: " + key.toString() + " Value: " + val.toString());
            System.out.println("Key: " + key.getClass() + " Value: " + val.getClass());
            ++total;
            while((float) kept / total < keep) {
                ++kept;
                out.collect(key, val);
            }
        }
    }

    public static class MySampleMapper extends SampleMapBase{
        String inputFile;
        public void setup(Context context) {
            this.inputFile = ((FileSplit) context.getInputSplit()).getPath().toString();
            setKeep(context.getConfiguration().getFloat("hadoop.sort.map.keep.percent", (float) 100.0) / (float) 100.0);
        }
        public void map(Text key, Text val,
                        OutputCollector<Text,Text> output, Reporter reporter)
                throws IOException {
            emit(key, val, output);
            System.out.println("EMIT");
        }
    public static void main(String[] args) throws Exception {
        GenericOptionsParser parser = new GenericOptionsParser(new Configuration(), args);
        String[] otherArgs = parser.getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: webdatascan -Dfile.path=<path> [<in>...] <out>");
            System.exit(2);
        }
        System.setProperty("file.path", parser.getConfiguration().get("file.path"));
        List<String> remoteHosts = new ArrayList<String>();// directory where the map output is remotely
        remoteHosts.add(Inet4Address.getLocalHost().getHostAddress());
        // first map tasks
        JobConf conf = MyWebDataScan.addJob(1, true, true);
        Job job = new Job(conf, conf.getJobName());
        job.setJarByClass(MyWebDataScan.class);
        job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class);
        job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
        job.setMapperClass(MySampleMapper.class);
        job.setReducerClass(MySampleReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        Path[] inputPaths = new Path[otherArgs.length-1];
        for (int i = 0; i < otherArgs.length - 1; ++i) { inputPaths[i] = new Path(otherArgs[i]); }
        Path outputPath =  new Path(otherArgs[otherArgs.length - 1]);
        FileInputFormat.setInputPaths(conf, inputPaths);
        FileOutputFormat.setOutputPath(conf, outputPath);
        job.getConfiguration().set("mapred.output.dir", outputPath.toString());
        JobClient.runJob(job);
    }

您已经混淆了旧的API和新的API。您的输入和输出格式类使用新的API中的类,但映射函数是以旧的API形式指定的。如果您已经导入了org.apache.hadoop.mapreduce.mapper,那么映射函数应该有类似的规范

private static class RecordMapper
extends Mapper<LongWritable, Text, Text, LongWritable> {
    public void map(LongWritable lineOffset, Text record, Context output)
    throws IOException, InterruptedException {
        output.write(new Text("Count"), new LongWritable(1));
    }
}

当您使用这样的map函数时,map函数将永远不会被调用。相反,它将使用默认的地图功能,如本视频中所述

public void map(Text key, 
                Text val, 
                OutputCollector<Text,Text> output, 
                Reporter reporter) 
    throws IOException

相关内容

  • 没有找到相关文章

最新更新