在MapReduce中,如何将数组列表作为值从映射器发送到化简器



我们如何将数组列表作为值从映射器传递到化简器。

我的代码基本上有一定的规则可以使用,并且会根据规则创建新值(字符串)。我正在列表中维护所有输出(规则执行后生成),现在需要将此输出(映射器值)发送到 Reducer,但没有办法这样做。

有人可以给我指出一个方向吗

添加代码

package develop;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import utility.RulesExtractionUtility;
public class CustomMap{

    public static class CustomerMapper extends Mapper<Object, Text, Text, Text> {
        private Map<String, String> rules;
        @Override
        public void setup(Context context)
        {
            try
            {
                URI[] cacheFiles = context.getCacheFiles();
                setupRulesMap(cacheFiles[0].toString());
            }
            catch (IOException ioe)
            {
                System.err.println("Error reading state file.");
                System.exit(1);
            }
        }
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//          Map<String, String> rules = new LinkedHashMap<String, String>();
//          rules.put("targetcolumn[1]", "ASSIGN(source[0])");
//          rules.put("targetcolumn[2]", "INCOME(source[2]+source[3])");
//          rules.put("targetcolumn[3]", "ASSIGN(source[1]");
//          Above is the "rules", which would basically create some list values from source file
            String [] splitSource = value.toString().split(" ");
            List<String>lists=RulesExtractionUtility.rulesEngineExecutor(splitSource,rules);
//          lists would have values like (name, age) for each line from a huge text file, which is what i want to write in context and pass it to the reducer.
//          As of now i havent implemented the reducer code, as m stuck with passing the value from mapper.
//          context.write(new Text(), lists);---- I do not have a way of doing this

        }


        private void setupRulesMap(String filename) throws IOException
        {
            Map<String, String> rule = new LinkedHashMap<String, String>();
            BufferedReader reader = new BufferedReader(new FileReader(filename));
            String line = reader.readLine();
            while (line != null)
            {
                String[] split = line.split("=");
                rule.put(split[0], split[1]);
                line = reader.readLine();
                // rules logic
            }
            rules = rule;
        }
    }
    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException, URISyntaxException {

    Configuration conf = new Configuration();
    if (args.length != 2) {
        System.err.println("Usage: customerMapper <in> <out>");
        System.exit(2);
    }
    Job job = Job.getInstance(conf);
    job.setJarByClass(CustomMap.class);
    job.setMapperClass(CustomerMapper.class);
    job.addCacheFile(new URI("Some HDFS location"));

    URI[] cacheFiles= job.getCacheFiles();
    if(cacheFiles != null) {
        for (URI cacheFile : cacheFiles) {
            System.out.println("Cache file ->" + cacheFile);
        }
    }
    // job.setReducerClass(Reducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

要将数组列表从映射器传递到化简器,很明显对象必须实现可写接口。你为什么不试试这个库?

<dependency>
    <groupId>org.apache.giraph</groupId>
    <artifactId>giraph-core</artifactId>
    <version>1.1.0-hadoop2</version>
</dependency>

它有一个抽象类:

public abstract class ArrayListWritable<M extends org.apache.hadoop.io.Writable>
extends ArrayList<M>
implements org.apache.hadoop.io.Writable, org.apache.hadoop.conf.Configurable

您可以创建自己的类和源代码,填充抽象方法并使用代码实现接口方法。例如:

public class MyListWritable extends ArrayListWritable<Text>{
    ...
}

一种方法(可能不是唯一的,也不是最好的),是

  1. 在字符串中序列化列表以将其传递给映射器中的输出值

  2. 在化简器中读取输入值时,从字符串反序列化和重建列表

如果这样做,则还应删除包含序列化列表的字符串中的所有特殊符号(例如nt等符号)。实现此目的的一种简单方法是使用 base64 编码字符串。

您应该发送Text对象而不是String对象。然后,您可以在减速器中使用object.toString()。请务必正确配置驱动程序。

如果您发布代码,我们将进一步帮助您。

相关内容

  • 没有找到相关文章

最新更新