我是Hindsight & Hadoop map reduce概念的新手。我正在尝试使用mapreduce程序将多个XML文件合并为单个XML文件。我的目的是通过将文件名作为开始和结束标记来预置和附加文件名,将每个 XML 文件合并到目标 XML 文件中。例如。下面的XML应该合并到如下所示的单个XML中
输入 XML 文件
<xml><a></a></xml>
<xml><b></b></xml>
<xml><c></c></xml>
输出 XML 文件
<xml>
<File1Name><xml><a></a></xml><File2Name>
<File2Name><xml><b></b></xml><File3Name>
<File3Name><xml><c></c></xml><File3Name>
<xml>
问题 1:是否可以将 XML 文件映射到每个映射器并创建一个键值对,键作为文件名,值作为每个 XML 文件的前缀和附加文件名作为开始和结束标记和化简器将所有 XML 合并到单个上下文并输出到上面显示的 XML。
问题 2:如何在映射器代码中获取文件名作为键?
答案 1:我不建议只向映射器发送单个XML,除非文件超过1gb。您可以将 xml 位置列表发送到映射器,然后在映射器代码中打开每个位置并将数据提取到输出中。
答案 2:如果使用 Azure Blob 存储,则可以列出容器中的所有 Blob,并将它们分配给输入拆分。
How to create your list of InputSplits:
ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
/*Do this for each path we receive. Creates a directory of splits in this order s = input path (S1,1),(s2,1)…(sN,1),(s1,2),(sN,2),(sN,3) etc..
*/
for (int i = numMinNameHashSplits; i <= Math.min(numMaxNameHashSplits,numNameHashSplits–1); i++) {
for (Path inputPath : inputPaths) {
ret.add(new ParseDirectoryInputSplit(inputPath.toString(), i));
System.out.println(i + ” “+inputPath.toString());
}
}
return ret;
}
}
Once the List<InputSplits> is assembled, each InputSplit is handed to a Record Reader class where each Key, Value, pair is read then passed to the map task. The initialization of the recordreader class uses the InputSplit, a string representing the location of a “folder” of invoices in blob storage, to return a list of all blobs within the folder, the blobs variable below. The below Java code demonstrates the creation of the record reader for each hashslot and the resulting list of blobs in that location.
Public class ParseDirectoryFileNameRecordReader
extends RecordReader<IntWritable, Text> {
private int nameHashSlot;
private int numNameHashSlots;
private Path myDir;
private Path currentPath;
private Iterator<ListBlobItem> blobs;
private int currentLocation;
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
myDir = ((ParseDirectoryInputSplit)split).getDirectoryPath();
//getNameHashSlot tells us which slot this record reader is responsible for
nameHashSlot = ((ParseDirectoryInputSplit)split).getNameHashSlot();
//gets the total number of hashslots
numNameHashSlots = getNumNameHashSplits(context.getConfiguration());
//gets the input credientals to the storage account assigned to this record reader.
String inputCreds = getInputCreds(context.getConfiguration());
//break the directory path to get account name
String[] authComponents = myDir.toUri().getAuthority().split(“@”);
String accountName = authComponents[1].split(“\.”)[0];
String containerName = authComponents[0];
String accountKey = Utils.returnInputkey(inputCreds, accountName);
System.out.println(“This mapper is assigned the following account:”+accountName);
StorageCredentials creds = new StorageCredentialsAccountAndKey(accountName,accountKey);
CloudStorageAccount account = new CloudStorageAccount(creds);
CloudBlobClient client = account.createCloudBlobClient();
CloudBlobContainer container = client.getContainerReference(containerName);
blobs = container.listBlobs(myDir.toUri().getPath().substring(1) + “/”, true,EnumSet.noneOf(BlobListingDetails.class), null,null).iterator();
currentLocation = –1;
return;
}
Once initialized, the record reader is used to pass the next key to the map task. This is controlled by the nextKeyValue method, and it is called every time map task starts. The blow Java code demonstrates this.
//This checks if the next key value is assigned to this task or is assigned to another mapper. If it assigned to this task the location is passed to the mapper, otherwise return false
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
while (blobs.hasNext()) {
ListBlobItem currentBlob = blobs.next();
//Returns a number between 1 and number of hashslots. If it matches the number assigned to this Mapper and its length is greater than 0, return the path to the map function
if (doesBlobMatchNameHash(currentBlob) && getBlobLength(currentBlob) > 0) {
String[] pathComponents = currentBlob.getUri().getPath().split(“/”);
String pathWithoutContainer =
currentBlob.getUri().getPath().substring(pathComponents[1].length() + 1);
currentPath = new Path(myDir.toUri().getScheme(), myDir.toUri().getAuthority(),pathWithoutContainer);
currentLocation++;
return true;
}
}
return false;
}
The logic in the map function is than simply as follows, with inputStream containing the entire XML string
Path inputFile = new Path(value.toString());
FileSystem fs = inputFile.getFileSystem(context.getConfiguration());
//Input stream contains all data from the blob in the location provided by Text
FSDataInputStream inputStream = fs.open(inputFile);
资源:
http://www.andrewsmoll.com/3-hacks-for-hadoop-and-hdinsight-clusters/《黑客3》
http://blogs.msdn.com/b/mostlytrue/archive/2014/04/10/merging-small-files-on-hdinsight.aspx