我只需要执行一次map任务,而不需要在hdfs上输入目录。
这是OozieLauncherInputFormat,它们只执行一次map,但使用映射的API。
我想实现mapreduce API。我试着像他们一样。
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
*
*/
public class EmptyInputFormat extends InputFormat<Object, Object> {
boolean isReadingDone = false;
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
ArrayList<InputSplit> splits = new ArrayList<>();
splits.add(new EmptySplits());
return splits;
}
@Override
public RecordReader<Object, Object> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new RecordReader<Object, Object>() {
@Override
public void initialize(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (isReadingDone) {
return false;
} else {
isReadingDone = true;
return true;
}
}
@Override
public Object getCurrentKey() throws IOException, InterruptedException {
return new Object();
}
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
return new Object();
}
@Override
public float getProgress() throws IOException, InterruptedException {
if (isReadingDone) {
return 1.0f;
} else {
return 0.0f;
}
}
@Override
public void close() throws IOException {
}
};
}
public static class EmptySplits extends InputSplit implements Writable {
@Override
public long getLength() throws IOException, InterruptedException {
return 0L;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[0];
}
@Override
public void write(DataOutput out) throws IOException {
}
@Override
public void readFields(DataInput in) throws IOException {
}
}
}
但是我已经让Mapper执行了
Error: java.lang.ClassCastException: java.lang.Object cannot be cast to java.lang.Long
at ListenerMapper.map(ListenerMapper.java:19)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
ListenerMapper类
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import lib.Parameters;
import exception.ListenerException;
/**
* Mapper class for Listener
*/
public class ListenerMapper extends Mapper<Long, Text, Text, Text> {
@Override
protected void map(Long key, Text value, Context context)
throws IOException, InterruptedException {
Map<String, String> parameters = new HashMap<>();
Configuration configuration = new Configuration();
for (Map.Entry<String, String> entry : configuration) {
parameters.put(entry.getKey(), entry.getValue());
}
configuration.addResource(new Path("file:///",
System.getProperty(
"oozie.action.conf.xml")));
TaskAttemptID attemptId = null;
String command = System.getProperties().getProperty("sun.java.command");
for (String arg : command.split(" ")) {
if (arg.startsWith("attempt_")) {
attemptId = TaskAttemptID.forName(arg);
break;
}
}
if (attemptId != null) {
parameters.put(Parameters.MAPREDUCE_ATTEMPT_ID,
attemptId.toString());
parameters.put(Parameters.MAPREDUCE_TASK_ID,
attemptId.getTaskID().toString());
parameters.put(Parameters.MAPREDUCE_JOB_ID,
attemptId.getJobID().toString());
}
System.out.println("parameters = " + parameters);
try {
Main.listenerBootstrap(parameters);
} catch (ListenerException le) {
throw new RuntimeException(le);
}
}
}
我通过在Mapper
中使用Object
泛型来解决它
public class ListenerMapper extends Mapper<Object, Object, Object, Object>