在Hadoop MapReduce中的MultineReadedMapper类的内部线程映射器之间共享大型对象



我在Java中使用一个简单的 hadoop作业,带有映射器,该映射器会逐条处理我的文件。每个映射器都不是cpu结合的,而应在内存中保存一个非常大的对象(在我的情况下是一个绽放过滤器),哪个大小为2-15千兆字节(取决于计算精度)。在Mapper的setup()方法中,我从磁盘读取此对象并创建它。

我遇到了MultithreadedMapper类以在多个线程中执行我的计算。

job.setMapperClass(MultithreadMapper.class);
// ...
MultithreadedMapper.setMapperClass(job, MySingleThreadMapper.class);
MultithreadedMapper.setNumberOfThreads(job, 16);

但是,MultithreadedMapper似乎使用内部private class MapRunner extends Thread来产生线程映射:

public class MultithreadedMapper<K1, V1, K2, V2> extends Mapper<K1, V1, K2, V2> {
//...
    public void run(Context context) throws IOException, InterruptedException {
    // ...
        runners =  new ArrayList<MapRunner>(numberOfThreads);
        for(int i=0; i < numberOfThreads; ++i) {
            MapRunner thread = new MapRunner(context);
            thread.start();
            runners.add(i, thread);
        }
    }
}

这是一个问题:我如何一次在MultinReadedMapper中创建我非常大的对象,然后在群集节点(同一JVM)的线程映射之间共享(使用上下文或其他)?

我试图通过单身图案来实现它,但如果似乎不是一个美丽的解决方案。

preamble:我以前从未这样做过,但是我会使用静态锁来实现它以进行初始化:

static class MySingleThreadMapper extends Mapper<LongWritable, Text, Text, Text> {
    static MyResource sharedResource;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        synchronized (MySingleThreadMapper.class) {
             if (sharedResource == null) {
                 sharedResource = createResource();
             }
        }
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       // mapper code
       // sharedResource will be initialized here
    }
}

您可能已经知道,Hadoop在单独的JVM实例中减少了其地图并减少任务。因此,您所有的单线映射器都将在相同的JVM中运行,您可以依靠静态锁定。您可以将任何其他静态对象用作锁,您的共享资源只能初始化一次。

最新更新