我正在尝试使用MapReduce处理大量文档,其想法是将文件拆分为mapper中的文档,并在reducer阶段应用stanford coreNLP注释器。
我有一个相当简单的(标准)管道"tokenize,ssplit,pos,lemma,ner",reducer只是调用一个函数,该函数将这些注释器应用于reducer传递的值并返回注释(作为List of Strings),但是生成的输出是垃圾。
我已经观察到,如果从映射器内部调用注释函数,作业将返回预期的输出,但这击败了整个并行性。此外,当我忽略在reducer中获得的值并仅在虚拟字符串上应用注释器时,作业返回预期的输出。
这可能表明在进程中有一些线程安全问题,但我无法找出在哪里,我的注释函数是同步的,管道是私有的final。
有人可以提供一些指针,如何解决这个问题?
-Angshu
编辑:这是我的减速器看起来像,希望这能增加更多的清晰度
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
while (values.hasNext()) {
output.collect(key, new Text(se.getExtracts(values.next().toString()).toString()));
}
}
}
下面是get摘要的代码:
final StanfordCoreNLP pipeline;
public instantiatePipeline(){
Properties props = new Properties();
props.put("annotators", "tokenize, ssplit, pos, lemma, ner");
}
synchronized List<String> getExtracts(String l){
Annotation document = new Annotation(l);
ArrayList<String> ret = new ArrayList<String>();
pipeline.annotate(document);
List<CoreMap> sentences = document.get(SentencesAnnotation.class);
int sid = 0;
for(CoreMap sentence:sentences){
sid++;
for(CoreLabel token: sentence.get(TokensAnnotation.class)){
String word = token.get(TextAnnotation.class);
String pos = token.get(PartOfSpeechAnnotation.class);
String ner = token.get(NamedEntityTagAnnotation.class);
String lemma = token.get(LemmaAnnotation.class);
Timex timex = token.get(TimeAnnotations.TimexAnnotation.class);
String ex = word+","+pos+","+ner+","+lemma;
if(timex!=null){
ex = ex+","+timex.tid();
}
else{
ex = ex+",";
}
ex = ex+","+sid;
ret.add(ex);
}
}
我解决了这个问题,实际上问题是我正在读取的文件中的文本编码(将其转换为文本会导致进一步的损坏),这会导致标记化和溢出垃圾的问题。我正在清理输入字符串并应用严格的UTF-8编码,现在一切都很好。