我有一个没有任何化简器的mapreduce作业,它可以解析输入文件并以Parquet格式在映射器中写入磁盘上的一些输出。由于此作业可以将多个文件夹中的文件作为输入(每个日期一个文件夹),因此我还希望将输出拆分为文件夹,例如:
01JAN15
output-0000
output-0001
02JAN15
output-0000
output-0001
我查看了文档中的 MultipleOutput 格式类,但它似乎只能在 reduce 部分中的几个文件夹中写入。
不知何故,写入同一目录中的多个文件是有效的,但是一旦我尝试在多个目录中写入,我就会出现异常(也许是因为某些映射器试图同时创建相同的目录?
仅供参考,我的代码在映射器中看起来像这样:
mos.write("pb", null, message, date + "/output");
我像这样定义输出格式:
MultipleOutputs.addNamedOutput(job, "pb", ProtoParquetOutputFormat.class,
Void.class, com.google.protobuf.Message.class);
我得到的例外是:
15/01/11 15:05:09 WARN ipc.Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:400)
at java.util.concurrent.FutureTask.get(FutureTask.java:187)
at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1046)
at org.apache.hadoop.ipc.Client.call(Client.java:1441)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy9.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1220)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1210)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1200)
at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:271)
at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:238)
at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:231)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1498)
at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302)
at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:272)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
你知道我想做的是否可能吗?我做错了什么?谢谢!
你的意思,但不幸的是(据我所知)你需要自己做
只需创建从配置和实现工具接口扩展的驱动程序类。然后,您可以简单地配置一个回调,该回调将在MapRed执行完成后调用,然后只需编写代码即可移动相应文件夹中的文件。
这是链接
Mapred 完成后的回调
您可以使用分区输出到不同的文件中。
一个输出文件不能由多个进程(映射器或化简器)写入,因此为了生成多个输出文件,我要么必须定义自定义分区,要么在化简器中对数据进行分组,并在输出文件名中具有键。映射器无法将来自多个输入文件的数据写入同一文件中。