分布式缓存文件Hadoop



我想将不同的文件附加到不同的化简器。是否可以在Hadoop中使用分布式缓存技术?

我能够将相同的文件(文件)附加到所有化简器。但是由于内存限制,我想知道是否可以将不同的文件附加到不同的化简器。

如果这是一个无知的问题,请原谅我。

请帮忙!

提前感谢!

此外,

可能值得尝试使用内存计算/数据网格技术,如GridGain,Infinispan等。这样,您可以将数据加载到内存中,并且对于如何使用数据相关性将计算作业(映射/减少)映射到任何数据没有任何限制。

这是一个

奇怪的愿望,因为任何化简器都不绑定到特定的节点,并且在执行过程中,化简器可以在任何节点甚至节点上运行(如果出现故障或推测性执行)。因此,所有化简器都应该是同质的,唯一不同的是它们处理的数据。

所以我想当你说你想把不同的文件放在不同的化简器上时,你实际上想把不同的文件放在化简器上,这些文件应该对应于这些化简器将要处理的数据(键)。

我知道的唯一一种方法是将您的数据放在HDFS上,并在减速器开始处理数据时从它读取它。

package com.a;
import javax.security.auth.login.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class PrefixNew4Reduce4 extends MapReduceBase implements Reducer<Text, Text,   Text, Text>{
//  @SuppressWarnings("unchecked")

 ArrayList<String> al = new ArrayList<String>();
public void configure(JobConf conf4)
{
    String from = "home/users/mlakshm/haship"; 
    OutputStream dst = null;
    try {
        dst = new BufferedOutputStream(new FileOutputStream(to, false));
    } catch (FileNotFoundException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } /* src (hdfs file) something like hdfs://127.0.0.1:8020/home/rystsov/hi                                         */

    FileSystem fs = null;
    try {
        fs = FileSystem.get(new URI(from), conf4);
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (URISyntaxException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    FSDataInputStream src;
    try {
        src = fs.open(new Path(from));
        String val = src.readLine();
        StringTokenizer st = new StringTokenizer(val);
        al.add(val);

        System.out.println("val:----------------->"+val);
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

}

    public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

        StringTokenizer stk = new StringTokenizer(key.toString());
        String t = stk.nextToken();
        String i = stk.nextToken();
        String j = stk.nextToken();
    ArrayList<String> al1 = new ArrayList<String>();
           for(int i = 0; i<al.size(); i++)
            {
                     boolean a = (al.get(i).equals(i)) || (al.get(i).equals(j));
                     if(a==true)
                     {
                         output.collect(key, new Text(al.get(i));                              
                     }

      while(values.hasNext())
          {
             String val = values.next().toString();
             al1.add(val);
      }
for(int i = 0; i<al1.size(); i++)
{
output.collect(key, new Text(al1.get(i));
}

最新更新