Hadoop 分布式缓存通过通用选项 - 文件



当我在读《Hadoop In Action》一书时,有一个选项指出,而不是通过程序将小文件添加到分布式缓存中,这可以使用-files通用选项来完成。

当我在代码的setup((中尝试此操作时,我在fs.open((上得到了一个FileNotFoundException,它向我显示了一条不确定的路径。

问题是:如果我默认使用 -files 通用选项,则文件在 HDFS 中复制到的位置?

我尝试执行的代码如下。

public class JoinMapSide2 extends Configured implements Tool{
/*  Program     : JoinMapSide2.java 
    Description : Passing the small file via GenericOptionsParser
                  hadoop jar JoinMapSide2.jar -files orders.txt .........
    Input       : /data/patent/orders.txt(local file system), /data/patent/customers.txt
    Output      : /MROut/JoinMapSide2
    Date        : 23/03/2015  
*/
protected static class MapClass extends Mapper <Text,Text,NullWritable,Text>{
    // hash table to store the key+value from the distributed file or the background data
    private Hashtable <String, String> joinData = new Hashtable <String, String>();
    // setup function for filling up the joinData for each each map() call
    protected void setup(Context context) throws IOException, InterruptedException {
        String line;
        String[] tokens;
        FileSystem fs;
        FSDataInputStream fdis;
        LineReader joinReader;
        Configuration conf;
        Text buffer = new Text();
        // get configuration
        conf = context.getConfiguration();
        // get file system related to the configuration
        fs = FileSystem.get(conf);
        // get all the local cache files distributed as part of the job
        URI[] localFiles = context.getCacheFiles();
        System.out.println("Cache File Path:"+localFiles[0].toString());
        // check if there are any distributed files 
        // in our case we are sure we will always one so use that only 
        if (localFiles.length > 0){
            // since the file is now on HDFS FSDataInputStream to read through the file
            fdis = fs.open(new Path(localFiles[0].toString()));
            joinReader = new LineReader(fdis);
            // read local file until EOF
            try {
                while (joinReader.readLine(buffer) > 0) {
                    line = buffer.toString();
                    // apply the split pattern only once
                    tokens = line.split(",",2); 
                    // add key+value into the Hashtable
                    joinData.put(tokens[0], tokens[1]);
                }
            } finally {
                joinReader.close();
                fdis.close();
            }
        }
        else{
            System.err.println("No Cache Files are distributed");
        }
    }
    // map function
    protected void map(Text key,Text value, Context context) throws IOException, InterruptedException{
        NullWritable kNull = null;
        String joinValue = joinData.get(key.toString());
        if (joinValue != null){
            context.write(kNull, new Text(key.toString() + "," + value.toString() + "," + joinValue));
        }                   
    }
}   
@Override
public int run(String[] args) throws Exception {
    if (args.length < 2){
        System.err.println("Usage JoinMapSide -files <smallFile> <inputFile> <outputFile>");
    }
    Path inFile  = new Path(args[0]); // input file(customers.txt)
    Path outFile = new Path(args[1]); // output file file
    Configuration conf = getConf();
    // delimiter for the input file
    conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
    Job job = Job.getInstance(conf, "Map Side Join2");
    // this is not used as the small file is distributed to all the nodes in the cluster using
    // generic options parser
    // job.addCacheFile(disFile.toUri());   
    FileInputFormat.addInputPath(job, inFile);
    FileOutputFormat.setOutputPath(job, outFile);
    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
    job.setJarByClass(JoinMapSide2.class);
    job.setMapperClass(MapClass.class);
    job.setNumReduceTasks(0);
    job.waitForCompletion(true);
    return 0;
}
public static void main(String args[]) throws Exception {
    int ret = ToolRunner.run(new Configuration(), new JoinMapSide2(), args);
    System.exit(ret);
}

这是我在跟踪中看到的以下异常

Error: java.io.FileNotFoundException: File does not exist: /tmp/hadoop-yarn/staging/shiva/.staging/job_1427126201553_0003/files/orders.txt#orders.txt
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:64)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690)

我开始工作就像

hadoop jar JoinMapSide2.jar -files orders.txt /data/patent/join/customers.txt /MROut/JoinMapSide2

任何指示都会很有帮助。谢谢

首先你需要

将你的订单.txt移动到hdfs和你必须使用-files

好的,

经过一些搜索,我确实发现上面的代码中有 2 个错误。

  1. 我不应该使用FileDataInputStream来读取分布式文件,因为它是运行映射器的节点的本地文件,我应该使用File
  2. 我不应该使用URI.toString()而是应该使用添加到我的文件中的符号链接,这只是订单.txt

我已经更正了下面列出的代码,希望它有所帮助。

package org.samples.hina.training;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Hashtable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JoinMapSide2 extends Configured implements Tool{
/*  Program     : JoinMapSide2.java 
    Description : To learn Replicated Join using Distributed Cache via Generic Options -files
    Input       : file:/patent/join/orders1.txt(distributed to all nodes), /data/patent/customers.txt
    Output      : /MROut/JoinMapSide2
    Date        : 24/03/2015  
*/
protected static class MapClass extends Mapper <Text,Text,NullWritable,Text>{
    // hash table to store the key+value from the distributed file or the background data
    private Hashtable <String, String> joinData = new Hashtable <String, String>();
    // setup function for filling up the joinData for each each map() call
    protected void setup(Context context) throws IOException, InterruptedException {
        String line;
        String[] tokens;
        // get all the cache files set in the configuration set in addCacheFile()
        URI[] localFiles = context.getCacheFiles();
        System.out.println("File1:"+localFiles[0].toString());
        // check if there are any distributed files 
        // in our case we are sure we will always one so use that only 
        if (localFiles.length > 0){
            // read from LOCAL copy
            File localFile1 = new File("./orders1.txt");
            // created reader to localFile1
            BufferedReader joinReader = new BufferedReader(new FileReader(localFile1));
            // read local file until EOF
            try {
                while ((line = joinReader.readLine()) != null){
                    // apply the split pattern only once
                    tokens = line.split(",",2); 
                    // add key+value into the Hashtable
                    joinData.put(tokens[0], tokens[1]);
                }
            } finally {
                joinReader.close();                 
            }
        } else{
            System.err.println("Local Cache File does not exist");
        }           
    }
    // map function
    protected void map(Text key,Text value, Context context) throws IOException, InterruptedException{
        NullWritable kNull = null;
        String joinValue = joinData.get(key.toString());
        if (joinValue != null){
            context.write(kNull, new Text(key.toString() + "," + value.toString() + "," + joinValue));
        }                   
    }
}   
@Override
public int run(String[] args) throws Exception {
    if (args.length < 2){
        System.err.println("Usage JoinMapSide2 <inputFile> <outputFile>");
    }
    Path inFile   = new Path(args[0]); // input file(customers.txt)
    Path outFile  = new Path(args[1]); // output file file
    Configuration conf = getConf();
    // delimiter for the input file
    conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
    Job job = Job.getInstance(conf, "Map Side Join2");
    // add the files orders1.txt, orders2.txt to distributed cache
    // the files added by the Generic Options -files
    //job.addCacheFile(disFile1);
    FileInputFormat.addInputPath(job, inFile);
    FileOutputFormat.setOutputPath(job, outFile);
    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
    job.setJarByClass(JoinMapSide2.class);
    job.setMapperClass(MapClass.class);
    job.setNumReduceTasks(0);
    job.waitForCompletion(true);
    return 0;
}
public static void main(String args[]) throws Exception {
    int ret = ToolRunner.run(new Configuration(), new JoinMapSide2(), args);
    System.exit(ret);
}
}

相关内容

  • 没有找到相关文章

最新更新