我想使用hadoop从原始日志中获取和解析异常。我遇到一个问题,即某些异常(跨越多行)将是 2 个不同拆分的一部分,因此是 2 个不同的映射器的一部分。
我有一个避免这个问题的想法。我可以覆盖getSplits()
方法,使每个拆分都有一点冗余数据。 我认为这个解决方案对我来说成本太高了。
那么有没有人有更好的解决方案来解决这个问题呢?
我会选择预处理工作,在那里您可以使用XML标记异常。接下来,您可以使用XMLInputformat
来处理文件。(这只是解决方案的开始,根据您的反馈,我们可能会使事情更加具体)
此链接提供了编写自己的 XMLinputformat 的教程,您可以自定义该格式以查找"异常"特征。本教程的重点是这句话:
如果记录跨越输入拆分边界,则记录 读者会照顾好这一点,所以我们不必担心这一点。
我将复制粘贴网站的信息,因为它将来可能会离线,这对于将来查看此内容的人来说可能会非常令人沮丧:
输入格式:
package org.undercloud.mapreduce.example3;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class XmlInputFormat extends FileInputFormat {
public RecordReader getRecordReader(InputSplit input, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(input.toString());
return new XmlRecordReader(job, (FileSplit)input);
}
记录读取器:注意:读取超过拆分末尾的逻辑readUntilMatch
函数中,如果存在打开的标记,则读取超过拆分结束的函数。我认为这真的是你要找的!
package org.undercloud.mapreduce.example3;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class XmlRecordReader implements RecordReader {
private String startTagS = "";
private String endTagS = "";
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public XmlRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
startTag = startTagS.getBytes();
endTag = endTagS.getBytes();
// Open the file and seek to the start of the split
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(job);
fsin = fs.open(split.getPath());
fsin.seek(start);
}
public boolean next(Text key, XmlContent value) throws IOException {
// Get the next line
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(Long.toString(fsin.getPos()));
value.bufferData = buffer.getData();
value.offsetData = 0;
value.lenghtData = buffer.getLength();
return true;
}
}
finally {
buffer.reset();
}
}
}
return false;
}
private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
int i = 0;
while (true) {
int b = fsin.read(); // End of file -> T
if (b == -1) return false;
// F-> Save to buffer:
if (withinBlock) buffer.write(b);
if (b == match[i]) {
i++;
if (i >= match.length) return true;
} else i = 0;
// see if we’ve passed the stop point:
if(!withinBlock && i == 0 && fsin.getPos() >= end) return false;
}
}
public Text createKey() {
return new Text("");
}
public XmlContent createValue() {
return new XmlContent();
}
public long getPos() throws IOException {
return lineReader.getPos();
}
public void close() throws IOException {
lineReader.close();
}
public float getProgress() throws IOException {
return lineReader.getProgress();
}
}
最后是可写的:
package org.undercloud.mapreduce.example3;
import java.io.*;
import org.apache.hadoop.io.*;
public class XmlContent implements Writable{
public byte[] bufferData;
public int offsetData;
public int lenghtData;
public XmlContent(byte[] bufferData, int offsetData, int lenghtData) {
this.bufferData = bufferData;
this.offsetData = offsetData;
this.lenghtData = lenghtData;
}
public XmlContent(){
this(null,0,0);
}
public void write(DataOutput out) throws IOException {
out.write(bufferData);
out.writeInt(offsetData);
out.writeInt(lenghtData);
}
public void readFields(DataInput in) throws IOException {
in.readFully(bufferData);
offsetData = in.readInt();
lenghtData = in.readInt();
}
public String toString() {
return Integer.toString(offsetData) + ", "
+ Integer.toString(lenghtData) +", "
+ bufferData.toString();
}
}
这看起来是一个非常有用的教程,解决了跨越多个拆分的记录问题。如果您能够使此示例适应您的问题,请告诉我。
类 TextInputFormat 和 NLineInputFormat 可能会有所帮助。TextInputFormat 将按行拆分文件,因此如果异常以换行符结尾(并且其中不包含任何换行符),这应该可以工作。如果异常包含固定数量的行,则 NLineInputFormat 类应该是您想要的,因为您可以设置要采用的行数。
不幸的是,如果异常中可能包含可变数量的换行符,这将不起作用。
在这种情况下,我建议寻找Mahout的XmlInputFormat。它跨越了分裂的界限,因此适用于大多数东西。只需运行预处理器即可将异常放入 <exception></exception>
标记中,并将其指定为开始/结束标记。
示例预处理器,使用正则表达式识别异常
String input; //code this to the input string
String regex; //make this equal to the exception regex
BufferedWriter bw; //make this go to file where output will be stored
String toProcess = input;
boolean continueLoop = true;
while(continueLoop){
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(toProcess);
if(m.find()){
bw.write("<exception>"+toProcess.substring(m.start(),m.end())+"</exception>");
toProcess = toProcess.substring(m.end());
}else{
continueLoop = false;
}
}
感谢您的所有解决方案。我认为这对我有用
特别注意上面的评论
"如果记录跨越 InputSplit 边界,则记录 读者会照顾到这一点,所以我们不必担心 "这个。"
然后,我查看有关LineRecordReader如何读取数据表单拆分的源代码。 然后我发现实际上 LineRecordReader 有一些逻辑来读取跨越 InputSplit 边界的记录,导致拆分底部的行记录总是被拆分为 2 个不同的拆分由于块的大小限制。所以我认为我需要做的是添加 LineRecordReader 读取跨越分割边界的数据大小。
现在我的解决方案是:覆盖LineRecordReader中的方法"nextKeyValue()"。
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
while (pos < end) {
newSize = in.readLine(value, maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
maxLineLength));
将行"而"更改为"而(pos <<end + {param})"
{param} 表示读取记录器跨拆分边界读取的冗余数据的大小。