映射器在使用 multipleInputFormat 时未调用



我有一个驱动程序类,它使用MultipleInputFormat类在运行时调用不同的映射器。但是,当我在第一个 for 循环中使用MultipleInputs.addInputPath(job, fStatus.getPath(), TextInputFormat.class,CreatePureDeltaMapperOne.class)时,我的第一个映射器(CreatePureDeltaMapperOne)没有被调用。当我注释从第一个 for 循环调用多重输入格式的代码块并从外部调用它时,将调用映射器类。请帮我找到问题。

import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/***
 * Creates the pure delta file by matching the history records present in HDFS
 * @author Debajit
 *
 */
public class CreatePureDeltaDriver {
    /** 
     * @param args
     * @throws URISyntaxException 
     */
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {

            String historyFileInputPath=""; 
            String deltaFileDirectoryPath="";
            String pureDeltaFileOutPath="";
            Configuration config= new Configuration();
            Job job = new Job(config, "Pure Delta File Creation");
            job.setJarByClass(CreatePureDeltaDriver.class); 
            Path historyDirPath= new Path(historyFileInputPath);
            FileSystem fs = FileSystem.get(config); 
                    FileStatus[] statusHistory = fs.listStatus(historyDirPath);
                    for (FileStatus fStatus : statusHistory) {
                    String historyFileName=fStatus.getPath().getName();
                    if(historyFileName.contains("part-r")){
                        MultipleInputs.addInputPath(job, fStatus.getPath(), TextInputFormat.class,CreatePureDeltaMapperOne.class);
                    }
                }

            Path deltaDirPath= new Path(deltaFileDirectoryPath);
                    FileStatus[] statusDelta = fs.listStatus(deltaDirPath);
                    for (FileStatus fStatus : statusDelta) {
                        String deltaFileName=fStatus.getPath().getName();
                    if(deltaFileName.startsWith("part-r")){
                        MultipleInputs.addInputPath(job, fStatus.getPath(), TextInputFormat.class, CreatePureDeltaMapperTwo.class);
                    }
            }
            job.setMapperClass(CreatePureDeltaMapperOne.class);
            job.setMapperClass(CreatePureDeltaMapperTwo.class);
            job.setReducerClass(CreatePureDeltaReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            Path hisInPath = new Path(historyFileInputPath);
            Path outPath = new Path(pureDeltaFileOutPath);
            //MultipleInputs.addInputPath(job, hisInPath, TextInputFormat.class, CreatePureDeltaMapperOne.class);
            //MultipleInputs.addInputPath(job, delPath, TextInputFormat.class, CreatePureDeltaMapperTwo.class);
                FileOutputFormat.setOutputPath(job, outPath);
            System.out.println(job.waitForCompletion(true));
        }
    }

我的映射器类

import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class CreatePureDeltaMapperOne extends Mapper<LongWritable, Text, Text, Text> {
    private Text outKey = new Text();
    private Text outValue = new Text();
    int counter=0;
    private String delimiter="";
    private int primaryKeyIndicator =0;
private Integer numMapNodes = null;
    public void setup(Context context) throws IOException{
        System.out.println("SETUP--- Mapper 1");
        Configuration config = context.getConfiguration();
        Properties properties = new Properties();
        String propertyDirectory = config.get("propertyDirectory"); 
        String propertyFileName =config.get("propertyFileName");
        Path propertyDirPath= new Path(propertyDirectory);
        FileSystem fs = FileSystem.get(config);
        FileStatus[] status = fs.listStatus(propertyDirPath);
        for (FileStatus fStatus : status) {
            String propFileName=fStatus.getPath().getName().trim();
            if(propFileName.equals(propertyFileName)){
                properties.load(new InputStreamReader(fs.open(fStatus.getPath())));
                this.setNumMapNodes(Integer.parseInt(properties.getProperty("num.of.nodes").trim()));
                this.setDelimiter(properties.getProperty("file.delimiter.type").trim());
                this.setPrimaryKeyIndicator(Integer.parseInt(properties.getProperty("file.primary.key.index.specifier").trim()));
            }
        }
    }

    public void map(LongWritable key, Text val, Context context) throws IOException, InterruptedException{
        String valueString = val.toString().trim();

        String[] tokens = valueString.split(this.getDelimiter());
        String temp=tokens[this.getPrimaryKeyIndicator()].toString();
        System.out.println(" MAPPER 1 invoked");
        this.setOutKey(new Text(tokens[this.getPrimaryKeyIndicator()].toString().trim()));//Account number
        this.setOutValue(new Text("h"+valueString.trim()));
        context.write(outKey,outValue );

    }

}

不要在代码中使用这两行:job.setMapperClass(CreatePureDeltaMapperOne.class);job.setMapperClass(CreatePureDeltaMapperTwo.class);

因为您已经在循环中传递相应类的名称。希望对您有所帮助。

相关内容

  • 没有找到相关文章

最新更新