我有多个压缩文件,每个压缩文件包含8个大小为5-10kb的xml文件。我将这些数据用于测试目的,否则实时数据有数千个xml文件。我编写了map only程序来解压缩压缩文件
for(FileStatus status : status_list){
this.unzip(status.getPath().toString() , DestPath, fs);
}
这个方法将创建文件并读取未压缩的数据
FSDataOutputStream out = fs.create(new Path(filePath));
byte[] bytesIn = new byte[BUFFER_SIZE];
int read = 0;
while ((read = zipIn.read(bytesIn)) != -1) {
out.write(bytesIn, 0, read);
}
out.flush();
out.close();
当mapper试图写多个文件名不同的文件时,hadoop返回 LeaseExpiredException.
15/09/26 19:53:46 INFO mapreduce.Job: Task Id : attempt_1443265405944_0005_m_000000_0, Status : FAILED
Error: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /test_poc/x/injection_00001 (163rd copy).xml (inode 400006): File does not exist. [Lease. Holder: DFSClient_attempt_1443265405944_0005_m_000000_0_-657768289_1, pendingcreates: 1]
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3431)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3236)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3074)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3034)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:723)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
at org.apache.hadoop.ipc.Client.call(Client.java:1476)
at org.apache.hadoop.ipc.Client.call(Client.java:1407)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy12.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy13.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1430)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1226)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
我不知道如何处理多个压缩文件而不使用循环。I wrote map reduce program using MR2 API and using Hadoop 2.7.1 in psuedo distributed mode
。指针吗?
假设zipIn
是java.util.zip.ZipInputStream
,您不应该迭代调用getNextEntry
而不是读取字节吗?
我在代码中做了一些更改后解决了这个问题。在代码的第一部分中,我试图解压缩所有zip文件,而我应该可以访问spilts
。