我编写了一个mapreduce代码,用于将XML解析为CSV。但是在运行作业后,我没有在我的输出目录中找到任何输出。我不确定文件是否未被读取或未被写入。
这是我的全部代码。
public class XmlParser11
{
public static String outvalue;
public static class XmlInputFormat1 extends TextInputFormat {
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new XmlRecordReader();
}
public static class XmlRecordReader extends
RecordReader<LongWritable, Text> {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable key = new LongWritable();
private Text value = new Text();
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
System.out.println("B");
Configuration conf = context.getConfiguration();
startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
FileSplit fileSplit = (FileSplit) split;
// open the file and seek to the start of the split
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);
}
@Override
public boolean nextKeyValue() throws IOException,
InterruptedException {
System.out.println("C");
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(fsin.getPos());
value.set(buffer.getData(), 0,
buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public void close() throws IOException {
fsin.close();
}
@Override
public float getProgress() throws IOException {
return (fsin.getPos() - start) / (float) (end - start);
}
private boolean readUntilMatch(byte[] match, boolean withinBlock)
throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1)
return false;
// save to buffer:
if (withinBlock)
buffer.write(b);
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length)
return true;
} else
i = 0;
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end)
return false;
}
}
}
}
public static class Map extends Mapper<Text, Text,
Text, Text> {
@SuppressWarnings("unchecked")
@Override
protected void map(Text key, Text value,
@SuppressWarnings("rawtypes") Mapper.Context context)
throws
IOException, InterruptedException {
String document = value.toString();
System.out.println("‘" + document + "‘");
XMLInputFactory xmlif = XMLInputFactory.newInstance();
XMLStreamReader xmlr;
try {
xmlr = xmlif.createXMLStreamReader(new FileReader(document));
while(xmlr.hasNext())
{
printEvent(xmlr);
xmlr.next();
}
xmlr.close();
context.write(null,new Text (outvalue));
} catch (XMLStreamException e) {
e.printStackTrace();
}
}
private void printEvent(XMLStreamReader xmlr) {
switch (xmlr.getEventType()) {
case XMLStreamConstants.START_ELEMENT:
print(xmlr);
break;
case XMLStreamConstants.CHARACTERS:
int start = xmlr.getTextStart();
int length = xmlr.getTextLength();
System.out.print(new String(xmlr.getTextCharacters(),
start,
length));
break;
}
}
private String print(XMLStreamReader xmlr){
if(xmlr.hasName()){
for (int i=0; i < xmlr.getAttributeCount(); i++) {
String localName = xmlr.getLocalName();
if (localName != null);
String attName = xmlr.getAttributeLocalName(i);
String value = xmlr.getAttributeValue(i);
System.out.print(",");
String outvalue = localName +":"+ attName +"-"+value;
System.out.print(outvalue);
}
} return outvalue;
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
conf.set("xmlinput.start", "<FICHER>");
conf.set("xmlinput.end", "</FICHER>");
Job job = new Job(conf);
job.setJarByClass(XmlParser11.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(XmlParser11.Map.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(XmlInputFormat1.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
这是腻子的输出
<>之前文件系统计数器FILE:读取的字节数=0 strong text>FILE: Number of bytes written=120678FILE:读取操作数=0FILE:大读操作数=0FILE:写操作数=0HDFS:读取的字节数=1762671HDFS:写入字节数=0HDFS:读取操作数=5HDFS:大读操作数=0HDFS:写操作数=2柜台工作启动的地图任务=1机架-局部地图任务=1所有地图占用槽位的总时间(ms)=15960所有人在占用的插槽中花费的总时间(ms)=0所有map任务花费的总时间(ms)=3990所有map任务占用的总vcore-秒=3990所有map任务占用的总兆秒数=16343040使用映射-规约模式框架映射输入记录=0映射输出记录=0输入分割字节=124把记录= 0没有打乱= 0合并地图输出=0GC所用时间(ms)=0CPU时间(ms)=1390物理内存(字节)快照=513449984虚拟内存(字节)快照=4122763264总已提交堆使用量(字节)=2058354688文件输入格式计数器字节读= 1762547文件输出格式计数器字节写= 0问题在开始标签:
conf.set("xmlinput.start", "<FICHER");`
conf.set("xmlinput.end", "</FICHER>");