我正在编写一个MapReduce作业来测试一些计算。我将我的输入拆分为多个映射,这样每个映射都会完成部分计算,结果将是一个(X,y)对的列表,我想将其刷新到SequenceFile中。
地图部分进行得很好,但当Reducer启动时,我会得到这个错误:Exception in thread "main" java.io.FileNotFoundException: File does not exist: hdfs://172.16.199.132:9000/user/hduser/FractalJob_1452257628594_410365359/out/reduce-out
。
另一个观察结果是,只有当我使用更多地图时,才会出现这个错误。
更新这是我的Mapper和Reducer代码。
public static class RasterMapper extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
private int imageS;
private static Complex mapConstant;
@Override
public void setup(Context context) throws IOException {
imageS = context.getConfiguration().getInt("image.size", -1);
mapConstant = new Complex(context.getConfiguration().getDouble("constant.re", -1),
context.getConfiguration().getDouble("constant.im", -1));
}
@Override
public void map(IntWritable begin, IntWritable end, Context context) throws IOException, InterruptedException {
for (int x = (int) begin.get(); x < end.get(); x++) {
for (int y = 0; y < imageS; y++) {
float hue = 0, brighness = 0;
int icolor = 0;
Complex z = new Complex(2.0 * (x - imageS / 2) / (imageS / 2),
1.33 * (y - imageS / 2) / (imageS / 2));
icolor = startCompute(generateZ(z), 0);
if (icolor != -1) {
brighness = 1f;
}
hue = (icolor % 256) / 255.0f;
Color color = Color.getHSBColor(hue, 1f, brighness);
try {
context.write(new IntWritable(x + y * imageS), new IntWritable(color.getRGB()));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private static Complex generateZ(Complex z) {
return (z.times(z)).plus(mapConstant);
}
private static int startCompute(Complex z, int color) {
if (z.abs() > 4) {
return color;
} else if (color >= 255) {
return -1;
} else {
color = color + 1;
return startCompute(generateZ(z), color);
}
}
}
public static class ImageReducer extends Reducer<IntWritable, IntWritable, WritableComparable<?>, Writable> {
private SequenceFile.Writer writer;
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
writer.close();
}
@Override
public void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
Path outDir = new Path(conf.get(FileOutputFormat.OUTDIR));
Path outFile = new Path(outDir, "pixels-out");
Option optPath = SequenceFile.Writer.file(outFile);
Option optKey = SequenceFile.Writer.keyClass(IntWritable.class);
Option optVal = SequenceFile.Writer.valueClass(IntWritable.class);
Option optCom = SequenceFile.Writer.compression(CompressionType.NONE);
try {
writer = SequenceFile.createWriter(conf, optCom, optKey, optPath, optVal);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void reduce (IntWritable key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException {
try{
writer.append(key, value.iterator().next());
} catch (Exception e) {
e.printStackTrace();
}
}
}
我希望你们能帮我。非常感谢。
编辑:
Job failed as tasks failed. failedMaps:1 failedReduces:0
仔细查看日志,我注意到这个问题来自于我将数据输入地图的方式。我把我的图像大小分成几个序列文件,这样地图就可以从中读取它,并计算该区域像素的颜色。
这是我创建文件的方式:
try {
int offset = 0;
// generate an input file for each map task
for (int i = 0; i < mapNr; ++i) {
final Path file = new Path(input, "part" + i);
final IntWritable begin = new IntWritable(offset);
final IntWritable end = new IntWritable(offset + imgSize / mapNr);
offset = (int) end.get();
Option optPath = SequenceFile.Writer.file(file);
Option optKey = SequenceFile.Writer.keyClass(IntWritable.class);
Option optVal = SequenceFile.Writer.valueClass(IntWritable.class);
Option optCom = SequenceFile.Writer.compression(CompressionType.NONE);
SequenceFile.Writer writer = SequenceFile.createWriter(conf, optCom, optKey, optPath, optVal);
try {
writer.append(begin, end);
} catch (Exception e) {
e.printStackTrace();
} finally {
writer.close();
}
System.out.println("Wrote input for Map #" + i);
}
日志文件:
16/01/10 19:06:04 INFO client.RMProxy: Connecting to ResourceManager at /172.16.199.132:8032
16/01/10 19:06:07 INFO input.FileInputFormat: Total input paths to process : 4
16/01/10 19:06:07 INFO mapreduce.JobSubmitter: number of splits:4
16/01/10 19:06:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1452444283951_0007
16/01/10 19:06:08 INFO impl.YarnClientImpl: Submitted application application_1452444283951_0007
16/01/10 19:06:08 INFO mapreduce.Job: The url to track the job: http://172.16.199.132:8088/proxy/application_1452444283951_0007/
16/01/10 19:06:08 INFO mapreduce.Job: Running job: job_1452444283951_0007
16/01/10 19:06:19 INFO mapreduce.Job: Job job_1452444283951_0007 running in uber mode : false
16/01/10 19:06:20 INFO mapreduce.Job: map 0% reduce 0%
16/01/10 19:06:49 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000002_0, Status : FAILED
16/01/10 19:06:49 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000001_0, Status : FAILED
16/01/10 19:06:49 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000000_0, Status : FAILED
16/01/10 19:06:49 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000003_0, Status : FAILED
16/01/10 19:07:07 INFO mapreduce.Job: map 25% reduce 0%
16/01/10 19:07:08 INFO mapreduce.Job: map 50% reduce 0%
16/01/10 19:07:10 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000001_1, Status : FAILED
16/01/10 19:07:11 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000003_1, Status : FAILED
16/01/10 19:07:25 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_r_000000_0, Status : FAILED
16/01/10 19:07:32 INFO mapreduce.Job: map 100% reduce 0%
16/01/10 19:07:32 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000003_2, Status : FAILED
16/01/10 19:07:32 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_m_000001_2, Status : FAILED
16/01/10 19:07:33 INFO mapreduce.Job: map 50% reduce 0%
16/01/10 19:07:43 INFO mapreduce.Job: map 75% reduce 0%
16/01/10 19:07:44 INFO mapreduce.Job: Task Id : attempt_1452444283951_0007_r_000000_1, Status : FAILED
16/01/10 19:07:50 INFO mapreduce.Job: map 100% reduce 100%
16/01/10 19:07:51 INFO mapreduce.Job: Job job_1452444283951_0007 failed with state FAILED due to: Task failed task_1452444283951_0007_m_000003
Job failed as tasks failed. failedMaps:1 failedReduces:0
16/01/10 19:07:51 INFO mapreduce.Job: Counters: 40
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=3048165
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=765
HDFS: Number of bytes written=0
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Job Counters
Failed map tasks=9
Failed reduce tasks=2
Killed reduce tasks=1
Launched map tasks=12
Launched reduce tasks=3
Other local map tasks=8
Data-local map tasks=4
Total time spent by all maps in occupied slots (ms)=239938
Total time spent by all reduces in occupied slots (ms)=34189
Total time spent by all map tasks (ms)=239938
Total time spent by all reduce tasks (ms)=34189
Total vcore-seconds taken by all map tasks=239938
Total vcore-seconds taken by all reduce tasks=34189
Total megabyte-seconds taken by all map tasks=245696512
Total megabyte-seconds taken by all reduce tasks=35009536
Map-Reduce Framework
Map input records=3
Map output records=270000
Map output bytes=2160000
Map output materialized bytes=2700018
Input split bytes=441
Combine input records=0
Spilled Records=270000
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=538
CPU time spent (ms)=5520
Physical memory (bytes) snapshot=643928064
Virtual memory (bytes) snapshot=2537975808
Total committed heap usage (bytes)=408760320
File Input Format Counters
Bytes Read=324
Constructing image...
Exception in thread "main" java.io.FileNotFoundException: File does not exist: hdfs://172.16.199.132:9000/user/hduser/FractalJob_1452445557585_342741171/out/pixels-out
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1752)
at FractalJob.generateFractal(FractalJob.j..
这是配置:
conf.setInt("image.size", imgSize);
conf.setDouble("constant.re", FractalJob.constant.re());
conf.setDouble("constant.im", FractalJob.constant.im());
Job job = Job.getInstance(conf);
job.setJobName(FractalJob.class.getSimpleName());
job.setJarByClass(FractalJob.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(RasterMapper.class);
job.setReducerClass(ImageReducer.class);
job.setNumReduceTasks(1);
job.setSpeculativeExecution(false);
final Path input = new Path(filePath, "in");
final Path output = new Path(filePath, "out");
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
您不需要担心创建自己的序列文件。MapReduce有一个输出格式可以自动执行。
因此,在您的驾驶员类中,您将使用:
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
然后在减速器中,你会写下:
context.write(key, values.iterator().next());
并删除所有CCD_ 2方法。
顺便说一句,看起来你根本不需要减速器。如果你没有在reducer中进行任何计算,也没有进行任何分组(我想你没有),那么为什么不删除它呢?job.setOutputFormatClass(SequenceFileOutputFormat.class)
将把映射器的输出写入序列文件。
如果您只想要一个输出文件,请设置
job.setNumReduceTasks(1);
如果您的最终数据不大于1块大小,您将获得所需的输出。
值得注意的是,目前每个键只输出一个值——您应该确保您想要这样做,并在reducer中包含一个循环,以便在不想要的情况下迭代这些值。