我在Java中使用一个简单的 hadoop作业,带有映射器,该映射器会逐条处理我的文件。每个映射器都不是cpu结合的,而应在内存中保存一个非常大的对象(在我的情况下是一个绽放过滤器),哪个大小为2-15千兆字节(取决于计算精度)。在Mapper的setup()
方法中,我从磁盘读取此对象并创建它。
我遇到了MultithreadedMapper
类以在多个线程中执行我的计算。
job.setMapperClass(MultithreadMapper.class);
// ...
MultithreadedMapper.setMapperClass(job, MySingleThreadMapper.class);
MultithreadedMapper.setNumberOfThreads(job, 16);
但是,MultithreadedMapper
似乎使用内部private class MapRunner extends Thread
来产生线程映射:
public class MultithreadedMapper<K1, V1, K2, V2> extends Mapper<K1, V1, K2, V2> {
//...
public void run(Context context) throws IOException, InterruptedException {
// ...
runners = new ArrayList<MapRunner>(numberOfThreads);
for(int i=0; i < numberOfThreads; ++i) {
MapRunner thread = new MapRunner(context);
thread.start();
runners.add(i, thread);
}
}
}
这是一个问题:我如何一次在MultinReadedMapper中创建我非常大的对象,然后在群集节点(同一JVM)的线程映射之间共享(使用上下文或其他)?
我试图通过单身图案来实现它,但如果似乎不是一个美丽的解决方案。
preamble:我以前从未这样做过,但是我会使用静态锁来实现它以进行初始化:
static class MySingleThreadMapper extends Mapper<LongWritable, Text, Text, Text> {
static MyResource sharedResource;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
synchronized (MySingleThreadMapper.class) {
if (sharedResource == null) {
sharedResource = createResource();
}
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// mapper code
// sharedResource will be initialized here
}
}
您可能已经知道,Hadoop在单独的JVM实例中减少了其地图并减少任务。因此,您所有的单线映射器都将在相同的JVM中运行,您可以依靠静态锁定。您可以将任何其他静态对象用作锁,您的共享资源只能初始化一次。