Hadoop MapReduce MapContext.write()线程安全吗?



传统上,Hadoop MapReduce映射器顺序地处理数据,并通过写入映射上下文来发出值,如org.apache.hadoop.mapreduce.MapperAPI文档中摘录的示例所示:

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}

但是,如果映射器决定启动几个线程,每个线程做一部分工作(在映射器中有一种进一步的分而治之)呢?每一个都想在准备好时发出数据。

MapContext.write()(实际上是TaskInputOutputContext.write(KEYOUT key, VALUEOUT value))是否支持在映射器内并发调用?或者我必须同步对MapContext.write()的调用并保证它们是顺序调用的?(如果我应该而我没有,会发生什么样的坏事呢?)

(我期望回复说我不应该在一个映射器中启动多个线程。虽然我尊重这个观点,但实际上这就是我目前正在做的。

不,默认不是。这就是为什么它们允许您覆盖run()方法,该方法处理映射器操作流。

这是run的源代码和默认实现:

public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}

在Mapper javaDoc的最后,这个短语给出了一些提示:

应用程序可以覆盖run(org.apache.hadoop.mapreduce.Mapper.Context)法发挥对映射处理的更大控制,例如多线程映射等。

意思很清楚;你想要多线程环境下的线程安全,那很酷…但是你实现了逻辑。A非常基本的这方面的例子可以是(我同步所有,因为我不知道每个方法/元素的细节,仅为示例):

@Override
public void run(Context context) throws IOException, InterruptedException 
{
synchronized(lock)   //shared lock, class lock, etc..
{ 
setup(context);
while (context.nextKeyValue()) 
map(context.getCurrentKey(), context.getCurrentValue(), context);

cleanup(context);
}
}

虽然没有特别提到此方法的多线程,但map()中也发生了类似的事情:

protected void map

对输入中的每个键/值对调用一次分裂。大多数应用程序应该重写这个,但默认是恒等函数。


在resume中,重写这些方法(至少其中一个),应该允许您创建一个多线程Mapper扩展。

是的,它可以是线程安全的,但这是你的工作。默认实现不是.


Update -multithreadadedmapper

他们提供了一个多线程的可运行的映射器实现:无论如何,似乎关于映射/写操作的同步仍然是你的工作;这只是实现了从不同的池线程中调用run()的机制。

MultithreadedMapper

@link的多线程实现org.apache.hadoop.mapreduce.Mapper。它可以用来代替默认实现,MapRunner,当Map操作不是CPU时绑定以提高吞吐量。

使用MapRunnable的Mapper实现必须是线程安全的。

Map-Reduce作业必须配置要使用的映射器setMapperClass(Job, Class)和线程池中的线程数可以与getNumberOfThreads(JobContext)方法一起使用。默认的值是10个线程

这里的关键似乎是setMapperClass()方法;Mapper的自定义线程安全扩展应该作为参数传递。

——附录-MultithreadedZipContentLoader

有趣的部分在main()中,它定义了使用MultiMapper的协议
public class MultithreadedZipContentLoader {
public static class ZipContentMapper extends Mapper<Text, Text, DocumentURI, Text> 
{
private DocumentURI uri = new DocumentURI();

public void map(Text fileName, Text fileContent, Context context) 
throws IOException, InterruptedException 
{
uri.setUri(fileName.toString());
context.write(uri, fileContent);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: MultithreadedZipContentLoader configFile inputDir threadCount");
System.exit(2);
}

Job job = Job.getInstance(conf);
job.setJarByClass(MultithreadedZipContentLoader.class);
job.setInputFormatClass(ZipContentInputFormat.class);
job.setMapperClass(MultithreadedMapper.class);
MultithreadedMapper.setMapperClass(job, ZipContentMapper.class);
MultithreadedMapper.setNumberOfThreads(job, Integer.parseInt(args[2]));
job.setMapOutputKeyClass(DocumentURI.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(ContentOutputFormat.class);

ZipContentInputFormat.setInputPaths(job, new Path(otherArgs[1]));
conf = job.getConfiguration();
conf.addResource(otherArgs[0]);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
class ZipContentInputFormat extends FileInputFormat<Text, Text> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}

@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new ZipContentReader();
}

}
...

这个例子唯一缺少的是扩展map方法中的同步部分,因为在这种情况下似乎没有必要(没有重复的文件uri,我猜)。