Hadoop mapreduce空输入格式



我只需要执行一次map任务,而不需要在hdfs上输入目录。

这是OozieLauncherInputFormat,它们只执行一次map,但使用映射的API。

我想实现mapreduce API。我试着像他们一样。

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
 *
 */
public class EmptyInputFormat extends InputFormat<Object, Object> {
    boolean isReadingDone = false;
    @Override
    public List<InputSplit> getSplits(JobContext job) throws IOException {
        ArrayList<InputSplit> splits = new ArrayList<>();
        splits.add(new EmptySplits());
        return splits;
    }
    @Override
    public RecordReader<Object, Object> createRecordReader(InputSplit split,
                                                           TaskAttemptContext context) throws IOException, InterruptedException {
        return new RecordReader<Object, Object>() {
            @Override
            public void initialize(InputSplit split,
                                   TaskAttemptContext context) throws IOException, InterruptedException {
            }
            @Override
            public boolean nextKeyValue() throws IOException, InterruptedException {
                if (isReadingDone) {
                    return false;
                } else {
                    isReadingDone = true;
                    return true;
                }
            }
            @Override
            public Object getCurrentKey() throws IOException, InterruptedException {
                return new Object();
            }
            @Override
            public Object getCurrentValue() throws IOException, InterruptedException {
                return new Object();
            }
            @Override
            public float getProgress() throws IOException, InterruptedException {
                if (isReadingDone) {
                    return 1.0f;
                } else {
                    return 0.0f;
                }
            }
            @Override
            public void close() throws IOException {
            }
        };
    }
    public static class EmptySplits extends InputSplit implements Writable {
        @Override
        public long getLength() throws IOException, InterruptedException {
            return 0L;
        }
        @Override
        public String[] getLocations() throws IOException, InterruptedException {
            return new String[0];
        }
        @Override
        public void write(DataOutput out) throws IOException {
        }
        @Override
        public void readFields(DataInput in) throws IOException {
        }
    }
}

但是我已经让Mapper执行了

Error: java.lang.ClassCastException: java.lang.Object cannot be cast to java.lang.Long
        at ListenerMapper.map(ListenerMapper.java:19)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

ListenerMapper类

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import lib.Parameters;
import exception.ListenerException;
/**
 * Mapper class for Listener
 */
public class ListenerMapper extends Mapper<Long, Text, Text, Text> {
    @Override
    protected void map(Long key, Text value, Context context)
            throws IOException, InterruptedException {
        Map<String, String> parameters = new HashMap<>();
        Configuration configuration = new Configuration();
        for (Map.Entry<String, String> entry : configuration) {
            parameters.put(entry.getKey(), entry.getValue());
        }
        configuration.addResource(new Path("file:///",
                                           System.getProperty(
                                                   "oozie.action.conf.xml")));
        TaskAttemptID attemptId = null;
        String command = System.getProperties().getProperty("sun.java.command");
        for (String arg : command.split(" ")) {
            if (arg.startsWith("attempt_")) { 
                attemptId = TaskAttemptID.forName(arg);
                break;
            }
        }
        if (attemptId != null) {
            parameters.put(Parameters.MAPREDUCE_ATTEMPT_ID,
                           attemptId.toString());
            parameters.put(Parameters.MAPREDUCE_TASK_ID,
                           attemptId.getTaskID().toString());
            parameters.put(Parameters.MAPREDUCE_JOB_ID,
                           attemptId.getJobID().toString());
        }
        System.out.println("parameters = " + parameters);
        try {
            Main.listenerBootstrap(parameters);
        } catch (ListenerException le) {
            throw new RuntimeException(le);
        }

    }
}

我通过在Mapper中使用Object泛型来解决它

public class ListenerMapper extends Mapper<Object, Object, Object, Object>

相关内容

  • 没有找到相关文章

最新更新