我有一个驱动程序类,它使用MultipleInputFormat类在运行时调用不同的映射器。但是,当我在第一个 for 循环中使用MultipleInputs.addInputPath(job, fStatus.getPath(), TextInputFormat.class,CreatePureDeltaMapperOne.class)
时,我的第一个映射器(CreatePureDeltaMapperOne)没有被调用。当我注释从第一个 for 循环调用多重输入格式的代码块并从外部调用它时,将调用映射器类。请帮我找到问题。
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/***
* Creates the pure delta file by matching the history records present in HDFS
* @author Debajit
*
*/
public class CreatePureDeltaDriver {
/**
* @param args
* @throws URISyntaxException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
String historyFileInputPath="";
String deltaFileDirectoryPath="";
String pureDeltaFileOutPath="";
Configuration config= new Configuration();
Job job = new Job(config, "Pure Delta File Creation");
job.setJarByClass(CreatePureDeltaDriver.class);
Path historyDirPath= new Path(historyFileInputPath);
FileSystem fs = FileSystem.get(config);
FileStatus[] statusHistory = fs.listStatus(historyDirPath);
for (FileStatus fStatus : statusHistory) {
String historyFileName=fStatus.getPath().getName();
if(historyFileName.contains("part-r")){
MultipleInputs.addInputPath(job, fStatus.getPath(), TextInputFormat.class,CreatePureDeltaMapperOne.class);
}
}
Path deltaDirPath= new Path(deltaFileDirectoryPath);
FileStatus[] statusDelta = fs.listStatus(deltaDirPath);
for (FileStatus fStatus : statusDelta) {
String deltaFileName=fStatus.getPath().getName();
if(deltaFileName.startsWith("part-r")){
MultipleInputs.addInputPath(job, fStatus.getPath(), TextInputFormat.class, CreatePureDeltaMapperTwo.class);
}
}
job.setMapperClass(CreatePureDeltaMapperOne.class);
job.setMapperClass(CreatePureDeltaMapperTwo.class);
job.setReducerClass(CreatePureDeltaReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path hisInPath = new Path(historyFileInputPath);
Path outPath = new Path(pureDeltaFileOutPath);
//MultipleInputs.addInputPath(job, hisInPath, TextInputFormat.class, CreatePureDeltaMapperOne.class);
//MultipleInputs.addInputPath(job, delPath, TextInputFormat.class, CreatePureDeltaMapperTwo.class);
FileOutputFormat.setOutputPath(job, outPath);
System.out.println(job.waitForCompletion(true));
}
}
我的映射器类
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
public class CreatePureDeltaMapperOne extends Mapper<LongWritable, Text, Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();
int counter=0;
private String delimiter="";
private int primaryKeyIndicator =0;
private Integer numMapNodes = null;
public void setup(Context context) throws IOException{
System.out.println("SETUP--- Mapper 1");
Configuration config = context.getConfiguration();
Properties properties = new Properties();
String propertyDirectory = config.get("propertyDirectory");
String propertyFileName =config.get("propertyFileName");
Path propertyDirPath= new Path(propertyDirectory);
FileSystem fs = FileSystem.get(config);
FileStatus[] status = fs.listStatus(propertyDirPath);
for (FileStatus fStatus : status) {
String propFileName=fStatus.getPath().getName().trim();
if(propFileName.equals(propertyFileName)){
properties.load(new InputStreamReader(fs.open(fStatus.getPath())));
this.setNumMapNodes(Integer.parseInt(properties.getProperty("num.of.nodes").trim()));
this.setDelimiter(properties.getProperty("file.delimiter.type").trim());
this.setPrimaryKeyIndicator(Integer.parseInt(properties.getProperty("file.primary.key.index.specifier").trim()));
}
}
}
public void map(LongWritable key, Text val, Context context) throws IOException, InterruptedException{
String valueString = val.toString().trim();
String[] tokens = valueString.split(this.getDelimiter());
String temp=tokens[this.getPrimaryKeyIndicator()].toString();
System.out.println(" MAPPER 1 invoked");
this.setOutKey(new Text(tokens[this.getPrimaryKeyIndicator()].toString().trim()));//Account number
this.setOutValue(new Text("h"+valueString.trim()));
context.write(outKey,outValue );
}
}
不要在代码中使用这两行:job.setMapperClass(CreatePureDeltaMapperOne.class);job.setMapperClass(CreatePureDeltaMapperTwo.class);
因为您已经在循环中传递相应类的名称。希望对您有所帮助。